In this notebook we perform the following:
This section sets the foundations for data handling, modeling, and portfolio simulation.
# --- Setup ---
import os, re, math, json, warnings
import random
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple, Iterable, Dict
import matplotlib.pyplot as plt
import itertools
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.base import clone
from sklearn.linear_model import Ridge, ElasticNet, HuberRegressor
from sklearn.ensemble import HistGradientBoostingRegressor
warnings.filterwarnings('ignore')
# ----- Paths & dates -----
DATA_ROOT = Path('data/tickers')
CACHE_PARQUET = Path('data/equity_equitydata.parquet')
START_DATE = '2014-01-01'
END_DATE = '2024-12-31'
# ---- Signals / targets ----
# Modeling target(s): forward log returns horizons (in trading days)
TARGET_HORIZONS = [5, 21] # trading days, 1-week and ~1-month
MAX_H = max(TARGET_HORIZONS) # used to embargo tail when training
# Walk-forward CV settings
TRAIN_YEARS = 3 # expanding window starts at 3 years
STEP_MONTHS = 1 # move forward one month between folds
EMBARGO_DAYS = MAX_H # gap between train and test windows
# Portfolio & costs
REBALANCE_FREQ = 'W-FRI' # 'W-FRI' or 'M' for monthly
LONG_SHORT = True # True -> long top q & short bottom q; False -> long-only
TOP_QUANTILE = 0.2 # cross-sectional top quantile to hold
HOLDING_PERIOD = 5 # days to hold each signal
TCOST_BPS = 20 # transaction cost in basis points per turnover
# Master toggle: run multi-variant sweep?
USE_VARIANT_SWEEP = False # True only when exploring
# Production model used by the whole notebook
PROD_MODEL_KIND = 'ridge' #{"ridge","elasticnet","huber","hgbt"}
PROD_MODEL_PARAMS = {}
# Single source of truth for all earlier sections (_fitpredict_one_horizon, etc.)
MODEL_KIND, MODEL_PARAMS = PROD_MODEL_KIND, PROD_MODEL_PARAMS
# ---- Misc ----
DTYPE = np.float32
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
os.environ["PYTHONHASHSEED"] = str(RANDOM_STATE)
os.chdir(Path().resolve())
C:\Users\nadiy\anaconda3\Lib\site-packages\pandas\core\arrays\masked.py:61: UserWarning: Pandas requires version '1.3.6' or newer of 'bottleneck' (version '1.3.5' currently installed). from pandas.core import (
Define constraints and toggles for portfolio construction: liquidity filters, beta-neutralization, position caps, and leverage settings.
These guard against illiquid names, unintended factor exposures, and concentration risks.
# --- Risk-control toggles & params ---
USE_ADV_ELIGIBILITY = True # True -> drop illiquid names by ADV percentile
ADV_LOOKBACK_DAYS = 60 # rolling days for ADV
ADV_PCT = 0.20 # keep names above this cross-sec percentile each date
USE_BETA_NEUTRAL = False # True -> neutralize scores vs rolling beta each date
BETA_LOOKBACK_DAYS = 252 # rolling days for beta estimation
MIN_XS_NAMES = 10 # guard for per-date regressions
RIDGE_EPS = 1e-12 # small ridge for numerical safety
USE_PER_NAME_CAP = True # clip per-name weights at abs cap, then renormalize
PER_NAME_CAP = 0.02 # e.g.: 2% max per name (long-only)
BETA_NEUTRAL_STRENGTH = 1.0 # 1.0 full, 0.5 half, 0 off = skip overlay
# target gross exposure (sum |w|). With LONG_SHORT=True, make_positions already splits 50/50 long & short.
GROSS_LEVERAGE = 1.0 # set 2.0 for “1 long + 1 short” gross
This section defines reusable plotting functions to visualize model performance and signal quality.
plot_equity_curve(eq, title)
Plots the cumulative equity curve of a single strategy over time.
Useful to quickly visualize performance evolution.
plot_rolling_ic(ic, window, title)
Plots the rolling mean of the Information Coefficient (IC), which measures the rank correlation between model predictions and realized returns.
Indicates whether the signal has stable predictive power through time.
lead_lag_ic_profile_plotly(pred_df, df_prices, horizons, title, compute_rank_ic_fn)
Computes and plots IC across different lead/lag horizons.
bar_ic_means(ic_dict, title)
Plots average IC values for each horizon or lag, making it easy to compare signals.
multi_equity_plot(eq_dict, title)
Compares equity curves from multiple strategies in one chart (e.g., baseline ML vs RL).
Helps assess relative robustness and performance.
# --- Plotly helpers ---
import plotly.graph_objects as go
import plotly.io as pio
from IPython.display import display, HTML, clear_output
pd.options.display.float_format = '{:.4f}'.format
pio.templates.default = "simple_white"
# Toggle: False to fall back to Matplotlib for any plot
USE_PLOTLY = True
def _fig_size(w=900, h=350): return dict(width=w, height=h)
def plot_equity_curve(eq: pd.Series, title="Strategy Equity Curve"):
if not USE_PLOTLY:
ax = eq.plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
fig = go.Figure([go.Scatter(x=eq.index, y=eq.values, mode="lines")])
fig.update_layout(title=title, yaxis_title="Equity", showlegend=False, **_fig_size())
fig.update_xaxes(showgrid=False)
fig.show()
def plot_rolling_ic(ic: pd.Series, window=60, title=None):
roll = ic.rolling(window).mean()
title = title or f"{window}D Rolling Rank IC"
if not USE_PLOTLY:
ax = roll.plot(figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return
fig = go.Figure([go.Scatter(x=roll.index, y=roll.values, mode="lines")])
fig.add_hline(y=0, line_width=1)
fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size())
fig.show()
def lead_lag_ic_profile_plotly(
pred_df,
df_prices,
horizons=range(-15,16),
title="Lead/Lag IC profile (H in days)",
compute_rank_ic_fn=None,
):
if compute_rank_ic_fn is None:
compute_rank_ic_fn = globals().get("compute_rank_ic", None)
if compute_rank_ic_fn is None:
raise NameError("compute_rank_ic is not defined yet; define it or pass as compute_rank_ic_fn=...")
prof = {}
for h in progress(list(horizons), total=len(list(horizons)),
desc="Lead/Lag IC"):
ic_h = compute_rank_ic_fn(pred_df, df_prices, horizon=h, min_names=10, demean=True)
prof[h] = float(ic_h.mean()) if len(ic_h) else np.nan
s = pd.Series(prof, name="IC")
if not USE_PLOTLY:
ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
fig = go.Figure([go.Bar(x=[int(i) for i in s.index], y=s.values)])
fig.add_hline(y=0, line_width=1)
fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(950,300))
fig.show()
return s
def bar_ic_means(ic_dict: dict, title="Per-lag mean IC"):
s = pd.Series(ic_dict).sort_index()
if not USE_PLOTLY:
ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
fig = go.Figure([go.Bar(x=[str(k) for k in s.index], y=s.values)])
fig.add_hline(y=0, line_width=1)
fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(700,300))
fig.show()
return s
def multi_equity_plot(eq_dict: dict, title="Equity curves"):
if not USE_PLOTLY:
ax = pd.concat(eq_dict, axis=1).plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
fig = go.Figure()
for name, s in eq_dict.items():
fig.add_trace(go.Scatter(x=s.index, y=s.values, mode="lines", name=name))
fig.update_layout(title=title, yaxis_title="Equity", **_fig_size(950,380))
fig.show()
To monitor long-running loops and model training, we enable tqdm progress bars and create two helper functions
# --- Progress bars -----------------------
from contextlib import nullcontext
USE_TQDM = True # master toggle for all progress bars
try:
from tqdm.auto import tqdm as _tqdm
try:
from tqdm.contrib import tqdm_joblib as _tqdm_joblib
except Exception:
_tqdm_joblib = None
except Exception:
_tqdm = None
_tqdm_joblib = None
def progress(iterable, **kwargs):
"""
Serial loops: wrap any iterable with a tqdm bar when available and enabled.
Falls back to identity iterator (no bar).
"""
if USE_TQDM and _tqdm is not None:
kwargs.setdefault("dynamic_ncols", True)
kwargs.setdefault("leave", False)
return _tqdm(iterable, **kwargs)
return iterable
def joblib_progress(total: int, desc: str = "Working"):
"""
Parallel loops (joblib.Parallel): return a context manager that
shows a single tqdm bar while tasks run. Falls back to a no-op.
Usage:
with joblib_progress(len(tasks), "Model variants"):
results = Parallel(...)(...)
"""
if USE_TQDM and (_tqdm is not None) and (_tqdm_joblib is not None):
return _tqdm_joblib(_tqdm(total=total, desc=desc, dynamic_ncols=True, leave=True))
return nullcontext()
This section defines helper functions for loading and standardizing raw ticker data.
The goal is to normalize column names, ensure consistent formatting (dates, prices, volumes), and efficiently cache results.
sanitize_columns(cols)
Cleans messy column names by removing invisible characters, replacing special symbols (%, /, -, etc.), and enforcing unique, safe identifiers.
Example: "Adj Close (%)" → "Adj_Close_pct"
_infer_ticker_from_path(path)
Extracts the ticker symbol from a file name (e.g., "ABT_technical_cleaned.csv" → "ABT").
Fallback: uses the parent folder name if the filename is ambiguous.
load_csv(path)
Loads a single ticker CSV into a standardized DataFrame.
Steps:
Date or Datetime). close, volume). DTYPE, here float32). load_all(root, force=False)
Recursively loads all CSVs from the given root folder:
CACHE_PARQUET) exists and force=False, load from it directly for speed. load_csv(), concatenate, and save to parquet. DataFrame covering all tickers. # --- Loader utilities ---
def sanitize_columns(cols):
"""Normalize weird characters and make safe, unique column names."""
out = []
for c in cols:
c2 = re.sub(r'[\u200b-\u200f\u202a-\u202e]', '', c) # zero-width & bidi
c2 = (c2.replace('%', 'pct').replace('/', '_').replace('\\', '_')
.replace('(', '_').replace(')', '_').replace(',', '_')
.replace(' ', '_').replace('-', '_'))
c2 = re.sub(r'__+', '_', c2)
c2 = re.sub(r'[^0-9A-Za-z_]', '', c2).strip('_')
out.append(c2)
# de-duplicate while preserving order
seen, uniq = {}, []
for c in out:
if c not in seen:
seen[c] = 0; uniq.append(c)
else:
seen[c] += 1; uniq.append(f"{c}_{seen[c]}")
return uniq
def _infer_ticker_from_path(path: Path) -> str:
"""ABT_technical_cleaned.csv -> ABT; fallback to parent folder name."""
stem = path.stem
tkr = stem.split('_', 1)[0].upper() if '_' in stem else path.parent.name.upper()
return tkr
def load_csv(path: Path) -> pd.DataFrame:
"""Load one ticker CSV with a consistent schema."""
df = pd.read_csv(path)
df.columns = sanitize_columns(df.columns)
# date parse & normalize (all files share the same layout; expect 'Date' or 'date')
date_col_candidates = [c for c in df.columns if c.lower() in ('date', 'datetime')]
assert date_col_candidates, f"No date column found in {path.name}"
dcol = date_col_candidates[0]
df[dcol] = pd.to_datetime(df[dcol], errors='coerce', utc=True)
df = df.dropna(subset=[dcol]).sort_values(dcol)
df[dcol] = df[dcol].dt.tz_localize(None).dt.normalize()
# rename canonical price/volume (files already cleaned, so this is just standardization)
if 'Close' in df.columns: # after sanitize, original 'Close' stays 'Close'
df = df.rename(columns={'Close': 'close'})
if 'close' not in df.columns and 'Close_1' in df.columns:
df = df.rename(columns={'Close_1': 'close'})
if 'Volume' in df.columns:
df = df.rename(columns={'Volume': 'volume'})
# numeric coercion for all non-date columns, then cast to DTYPE
for c in df.columns:
if c == dcol:
continue
if df[c].dtype == 'object':
df[c] = df[c].astype(str).str.replace(',', '', regex=False)
df[c] = pd.to_numeric(df[c], errors='coerce').astype(DTYPE, copy=False)
# index & dedup
df = df.set_index(dcol)
df = df[~df.index.duplicated(keep='last')]
# attach ticker
df['ticker'] = _infer_ticker_from_path(path)
return df
def load_all(root: Path, force: bool = False) -> pd.DataFrame:
"""
Recursively load all CSVs under DATA_ROOT into a single DataFrame,
cache to parquet for fast reloads.
"""
if CACHE_PARQUET.exists() and not force:
return pd.read_parquet(CACHE_PARQUET)
csvs = sorted(root.rglob('*.csv'))
if not csvs:
raise FileNotFoundError(f"No CSV files found under {root.resolve()}")
frames = [load_csv(p) for p in progress(csvs, desc="Load CSVs")]
out = pd.concat(frames, axis=0).sort_index()
out.to_parquet(CACHE_PARQUET)
return out
Now that our setup is complete and all helper functions are defined, we can load all raw ticker data into a unified DataFrame, restrict to the analysis window, and run basic quality checks.
Load data
load_all(DATA_ROOT, force=True) loads all CSVs and forces a fresh parquet build. force=False reuses the parquet cache for faster reload. Restrict to analysis window
START_DATE : END_DATE (2014–2024). ticker to a categorical type (saves memory, safer group ops). Define feature list
base_features: all columns except ticker and close. tickers: list of all available tickers. Sanity checks & coverage
Winsorization helper
winsorize_clip() will be used later to clip feature distributions at quantiles (reduce outlier impact). # Load (force=False in normal runs to use the parquet cache)
df = load_all(DATA_ROOT, force=True)
# Restrict to analysis window and normalize/index guarantees
df = df.loc[START_DATE:END_DATE].sort_index()
assert 'close' in df.columns, "'close' column missing after load"
assert getattr(df.index, "tz", None) is None, "Index should be tz-naive"
df.index = pd.to_datetime(df.index).normalize() # idempotent
df.index.name = "date"
df['ticker'] = df['ticker'].astype('category') # memory + safer ops
# Feature list BEFORE targets are added (keeps naming clear later)
base_features = [c for c in df.columns if c not in ['ticker', 'close']]
tickers = df['ticker'].cat.categories.tolist()
print(f"Rows: {len(df):,} | Tickers: {len(tickers)} | Raw features: {len(base_features)}")
print("Span:", df.index.min(), "→", df.index.max())
miss = df[base_features].isna().mean().sort_values(ascending=False)
print("NaN% (worst 10)\n", miss.head(10))
# coverage sanity per ticker
cov = (df.reset_index()
.groupby('ticker', as_index=False)
.agg(first_date=('date', 'min'),
last_date =('date', 'max'),
rows =('date', 'size'))
.sort_values('rows', ascending=False))
print("\nPer-ticker coverage (top 10 by rows):")
print(cov.head(10))
# Winsorization helper (used later inside pipelines)
def winsorize_clip(X: pd.DataFrame, lo=0.005, hi=0.995):
ql, qh = X.quantile(lo), X.quantile(hi)
return X.clip(lower=ql, upper=qh, axis=1)
Load CSVs: 0%| | 0/650 [00:00<…
Rows: 164,404 | Tickers: 60 | Raw features: 81 Span: 2014-01-01 00:00:00 → 2024-12-30 00:00:00 NaN% (worst 10) Visitor_Trend 0.3558 MA_Neg_Vol_255_ma 0.0199 KlingerSignal_Klinger_13_34_55 0.0155 Klinger_Klinger_13_34_55 0.0154 Klinger_13_34_55_hist 0.0154 Result_Vol_ROC_14 0.0076 Result_Chaikin_Vol_14_2_ma 0.0075 Result_Mass_Idx_25_27 0.0072 Result_Intraday_Mtm_20 0.0070 Result_M_Flow_14 0.0049 dtype: float64 Per-ticker coverage (top 10 by rows): ticker first_date last_date rows 40 AVB 2014-01-01 2024-12-30 2785 48 BAX 2014-01-01 2024-12-30 2785 43 AXON 2014-01-01 2024-12-30 2785 0 A 2014-01-01 2024-12-30 2784 44 AXP 2014-01-01 2024-12-30 2784 32 AOS 2014-01-01 2024-12-30 2784 33 APA 2014-01-01 2024-12-30 2784 34 APD 2014-01-01 2024-12-30 2784 35 APH 2014-01-01 2024-12-30 2784 36 APO 2014-01-01 2024-12-30 2784
We build forward-return targets per ticker for horizons in TARGET_HORIZONS, then keep only rows where at least one target exists. Feature/target separation and basic guards included.
# --- Target engineering ---
def add_targets(d: pd.DataFrame, horizons: List[int]) -> pd.DataFrame:
d = d.copy()
d["log_close"] = np.log(d["close"])
out = []
n_tkr = d['ticker'].nunique()
for tkr, g in progress(d.groupby("ticker", group_keys=False),
total=n_tkr, desc="Build forward targets"):
for h in horizons:
g[f"fwd_ret_{h}d"] = g["log_close"].shift(-h) - g["log_close"]
out.append(g)
return pd.concat(out)
# Build forward targets
df = add_targets(df, TARGET_HORIZONS)
# Columns
target_cols = [f"fwd_ret_{h}d" for h in TARGET_HORIZONS]
exclude_cols = set(["ticker", "close", "log_close"] + target_cols)
feature_cols = [c for c in df.columns if c not in exclude_cols]
# Basic guards
assert len(feature_cols) > 0, "No features left after exclusion!"
assert not (set(feature_cols) & exclude_cols), "Feature/target column overlap."
# Keep rows where at least one target exists (drop only tail per ticker)
df = df[df[target_cols].notna().any(axis=1)].copy()
print("Features after target add:", len(feature_cols))
print("Shape after dropping all-NA target rows:", df.shape)
Build forward targets: 0%| | 0/60 [00:00<…
Features after target add: 81 Shape after dropping all-NA target rows: (163987, 86)
We enforce an as-of principle: features used at time t must be known strictly before the prediction target.
Lag features
ASOF_LAG_DAYS=1) to all features within each ticker. Drop incomplete rows
Remove unstable features
Finalize types
float32). df_asof: fully lagged and cleaned dataset. # As-of feature freeze (global 1D lag)
ASOF_LAG_DAYS = 1
TARGET = f"fwd_ret_{TARGET_HORIZONS[0]}d"
df_asof = df.copy()
df_asof[feature_cols] = df_asof.groupby('ticker', group_keys=False)[feature_cols].shift(ASOF_LAG_DAYS)
# Drop rows missing either target or any feature
df_asof = df_asof.dropna(subset=[TARGET] + feature_cols, how='any')
# Optional: drop cross-sectionally constant-by-day features (speed + stability)
cs_nuniq = df_asof.groupby(level=0)[feature_cols].nunique()
share_const = (cs_nuniq <= 1).mean(axis=0).sort_values(ascending=False)
CONST_THRESH = 0.80
const_like = share_const[share_const > CONST_THRESH].index.tolist()
feature_cols = [c for c in feature_cols if c not in const_like]
df_asof = df_asof.dropna(subset=feature_cols + [TARGET]).copy()
# dtype
for c in feature_cols:
df_asof[c] = df_asof[c].astype(DTYPE)
print(f"[asof] df_asof: {df_asof.shape} | kept features: {len(feature_cols)} (dropped {len(const_like)})")
[asof] df_asof: (102915, 86) | kept features: 81 (dropped 0)
We impose two safeguards before modeling:
1) Minimum history requirement
A name must have at least MIN_HIST_DAYS (=252) days of history after the as-of lag to be eligible for scoring.
This stabilizes rolling stats (e.g., z-scores, betas) and avoids cold-start noise.
2) Leakage drop-list
Certain features are known/assumed to leak future information (directly or via look-ahead preprocessing).
We drop them from both the feature list and the training view to prevent optimistic bias.
Outputs
df_asof: filtered to names with sufficient history and without leaky fields.feature_cols: updated to exclude leaky features.feature_cols.# Require a minimum lookback before a name can be scored
MIN_HIST_DAYS = 252
tmp = df_asof.reset_index().rename(columns={"index":"date"}) if df_asof.index.name is None else df_asof.reset_index()
tmp["first_date"] = tmp.groupby("ticker")["date"].transform("min")
tmp["days_live"] = (tmp["date"] - tmp["first_date"]).dt.days
df_asof = tmp[tmp["days_live"] >= MIN_HIST_DAYS].drop(columns=["first_date","days_live"]).set_index("date")
# --- leakage drop-list (8 features) ---
LEAKY_FEATURES = [
"Result_Detrended_14_ma",
"MA_Neg_Vol_255_ma",
"STARC_Bands_Bottom_STARC_Bands_15_5_13_y",
"STARC_Bands_Median_STARC_Bands_15_5_13_y",
"STARC_Bands_Top_STARC_Bands_15_5_13_y",
"Result_Hist_Vol_10_252_1",
"PMOSignal_PMO_35_20_10",
"Gator_13_8_8_5_5_3_hist1",
]
# Drop safely from feature list and training view
existing = [c for c in LEAKY_FEATURES if c in df_asof.columns]
missing = sorted(set(LEAKY_FEATURES) - set(existing))
feature_cols = [c for c in feature_cols if c not in LEAKY_FEATURES]
df_asof = df_asof.drop(columns=LEAKY_FEATURES, errors="ignore").copy()
print(f"[leak-drop] dropped {len(existing)}/{len(LEAKY_FEATURES)} present → {existing}")
if missing:
print("[leak-drop] not present (already absent):", missing)
# Safety
assert not (set(feature_cols) & set(LEAKY_FEATURES))
assert df_asof[feature_cols + [TARGET]].notna().all().all(), "Found NA after drop."
print(f"[asof] df_asof now: {df_asof.shape} | kept features: {len(feature_cols)}")
[leak-drop] dropped 8/8 present → ['Result_Detrended_14_ma', 'MA_Neg_Vol_255_ma', 'STARC_Bands_Bottom_STARC_Bands_15_5_13_y', 'STARC_Bands_Median_STARC_Bands_15_5_13_y', 'STARC_Bands_Top_STARC_Bands_15_5_13_y', 'Result_Hist_Vol_10_252_1', 'PMOSignal_PMO_35_20_10', 'Gator_13_8_8_5_5_3_hist1'] [asof] df_asof now: (92911, 78) | kept features: 73
We implement walk-forward (expanding window) cross-validation, which is more realistic for time series than random splits.
TRAIN_YEARS (default = 3 years). EMBARGO_DAYS ensures no overlap between train and test (avoids leakage from target horizons). STEP_MONTHS controls how far the training window advances each iteration (default = 1 month). WFWindow dataclass: stores train/test start and end dates. wf_windows: full list of folds. wf_windows_run: subset of folds (optionally truncated by N_FOLDS_LIMIT). # Walk-forward windows
from dataclasses import dataclass
from dateutil.relativedelta import relativedelta
@dataclass
class WFWindow:
train_start: pd.Timestamp
train_end: pd.Timestamp
test_start: pd.Timestamp
test_end: pd.Timestamp
def generate_wf_windows(dates: pd.DatetimeIndex,
train_years: int = TRAIN_YEARS,
step_months: int = STEP_MONTHS,
embargo_days: int = EMBARGO_DAYS):
"""
Expanding training window; test on the next calendar month.
Embargo between train_end and test_start to avoid look-ahead via target horizon.
"""
dmin, dmax = dates.min(), dates.max()
start_train = dmin
start_end = start_train + relativedelta(years=train_years) - pd.Timedelta(days=1)
cursor = start_end + pd.Timedelta(days=embargo_days)
while cursor <= dmax - pd.tseries.offsets.MonthEnd(0):
test_start = (cursor + pd.offsets.MonthBegin(0)).normalize()
test_end = (test_start + pd.offsets.MonthEnd(0))
yield WFWindow(start_train, start_end, test_start, test_end)
start_end += pd.offsets.MonthEnd(step_months)
cursor = start_end + pd.Timedelta(days=embargo_days)
# Build windows on the AS-OF dataset
unique_dates = pd.DatetimeIndex(sorted(df_asof.index.unique())).normalize()
wf_windows = list(generate_wf_windows(unique_dates, TRAIN_YEARS, STEP_MONTHS, EMBARGO_DAYS))
N_FOLDS_LIMIT = None
wf_windows_run = wf_windows if not N_FOLDS_LIMIT else wf_windows[:N_FOLDS_LIMIT]
print("WF folds:", len(wf_windows_run))
wf_windows_run[:3]
WF folds: 43
[WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-06-13 00:00:00'), test_start=Timestamp('2021-08-01 00:00:00'), test_end=Timestamp('2021-08-31 00:00:00')),
WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-06-30 00:00:00'), test_start=Timestamp('2021-08-01 00:00:00'), test_end=Timestamp('2021-08-31 00:00:00')),
WFWindow(train_start=Timestamp('2018-06-14 00:00:00'), train_end=Timestamp('2021-07-31 00:00:00'), test_start=Timestamp('2021-09-01 00:00:00'), test_end=Timestamp('2021-09-30 00:00:00'))]
This section defines the train/test splitter, model pipelines, and two WF runners:
_split_wf(df_view, w, embargo_bd, asof_lag)¶Creates train/test slices for one walk-forward window w:
asof_lag) and horizon.embargo_bd business days from the effective training end to avoid leakage via target computation.build_pipeline(model)¶Builds a preprocessing + estimator pipeline:
"elasticnet" (with grid for alpha, l1_ratio) or "ridge" (grid for alpha).(pipeline, param_grid) for tuning.fit_predict_wf_fast_asof(...)¶Fast baseline using fixed Ridge(alpha) (no CV). For each fold:
_split_wf → fit → predict on test month.DataFrame with y_pred, ticker, wf_fold, model.fit_predict_wf_asof(...)¶Tuned variant with RandomizedSearchCV over the model grid using TimeSeriesSplit (3 splits) on the training chunk of each fold.
n_iter × CV_splits tasks per fold.Notes
df_asof, feature_cols, TARGET).wf_windows_run created earlier for consistent fold definitions.from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.model_selection import RandomizedSearchCV, TimeSeriesSplit
from pandas.tseries.offsets import BDay
def _split_wf(df_view, w, embargo_bd=MAX_H, asof_lag=ASOF_LAG_DAYS):
# Ceiling: stop training before the first test observation and before target horizon
ceiling = w.test_start - BDay(asof_lag + 1) # nothing from (t-1) bleeding into test
train_end_eff = min(w.train_end, ceiling) - BDay(max(1, embargo_bd))
tr = df_view.loc[(df_view.index >= w.train_start) & (df_view.index <= train_end_eff)]
te = df_view.loc[(df_view.index >= w.test_start) & (df_view.index <= w.test_end)]
return tr, te, train_end_eff
def build_pipeline(model: str = "elasticnet"):
pre = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler(with_mean=True, with_std=True)),
])
if model == "elasticnet":
est = ElasticNet(random_state=RANDOM_STATE, max_iter=2000)
params = {
"est__alpha": np.logspace(-4, 1, 50),
"est__l1_ratio": np.linspace(0.0, 1.0, 21),
}
elif model == "ridge":
est = Ridge(random_state=RANDOM_STATE)
params = {"est__alpha": np.logspace(-4, 2, 20)}
else:
raise ValueError("model must be 'elasticnet' or 'ridge'")
return Pipeline([("pre", pre), ("est", est)]), params
def fit_predict_wf_fast_asof(df_view: pd.DataFrame,
feature_cols: list,
target_col: str,
windows=None,
alpha: float = 1.0) -> pd.DataFrame:
if windows is None: windows = wf_windows_run
preds, pipe = [], Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
("est", Ridge(alpha=alpha, random_state=RANDOM_STATE)),
])
for k, w in enumerate(progress(windows, total=len(windows),
desc=f"WF ridge_fast [{target_col}]"), 1):
tr, te, tr_end_eff = _split_wf(df_view, w)
if te.empty:
print(f"Fold {k:02d} skipped (te=0)");
continue
tr = tr.dropna(subset=[target_col])
X_tr, y_tr = tr[feature_cols], tr[target_col]
X_te = te[feature_cols]
if len(X_tr)==0 or len(X_te)==0:
print(f"Fold {k:02d} skipped (tr={len(X_tr)}, te={len(X_te)})")
continue
pipe.fit(X_tr, y_tr)
yhat = pipe.predict(X_te)
out = pd.DataFrame(
{"ticker": te["ticker"].values, "y_pred": yhat,
"wf_fold": k, "model": "ridge_fast"},
index=te.index
); out.index.name = "date"
preds.append(out)
print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
f"| Test {w.test_start.date()}→{w.test_end.date()} | est={pipe.named_steps['est']}")
return pd.concat(preds).sort_index()
def fit_predict_wf_asof(df_view: pd.DataFrame,
feature_cols: list,
target_col: str,
model: str = "elasticnet",
n_iter: int = 25,
windows=None) -> pd.DataFrame:
if windows is None: windows = wf_windows_run
pipe, params = build_pipeline(model)
preds = []
for k, w in enumerate(progress(windows, total=len(windows),
desc=f"WF {model} [n_iter={n_iter}]"), 1):
tr, te, tr_end_eff = _split_wf(df_view, w)
if te.empty:
print(f"Fold {k:02d} skipped (te=0)");
continue
tr = tr.dropna(subset=[target_col])
X_tr, y_tr = tr[feature_cols], tr[target_col]
X_te = te[feature_cols]
cv = TimeSeriesSplit(n_splits=3)
tuner = RandomizedSearchCV(
pipe, params, n_iter=n_iter, random_state=RANDOM_STATE,
n_jobs=-1, cv=cv,
scoring="neg_mean_squared_error", verbose=0
)
total_tasks = n_iter * cv.get_n_splits()
with joblib_progress(total_tasks, desc=f"RS[{model}] fold {k:02d}"):
tuner.fit(X_tr, y_tr)
out = te[["ticker"]].copy()
out["y_pred"] = tuner.predict(X_te)
out["wf_fold"] = k
out["model"] = model
out.index.name = "date"
preds.append(out)
print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
f"| Test {w.test_start.date()}→{w.test_end.date()} | best={tuner.best_params_}")
return pd.concat(preds).sort_index()
This section converts model predictions into portfolio weights, simulates trades, and evaluates performance.
_cap_and_renorm(row, cap, long_short)
Applies a per-name weight cap and renormalizes weights:
make_positions(signal_df, ...)
Converts daily model scores (y_pred) into cross-sectional weights:
q fraction (and bottom if long/short). compute_returns_matrix(d)
Converts (date, ticker, close) into a date × ticker matrix of simple returns, de-duplicating per (date,ticker).
backtest(signal_pred, prices_df, ...)
Simulates trading given predictions and prices:
make_positions. perf_stats(ts, freq)
Computes key performance metrics from a return series:
def _cap_and_renorm(row: pd.Series, cap: float, long_short: bool) -> pd.Series:
if cap is None or cap <= 0:
return row
w = row.copy()
if long_short:
# 1) hard box cap on both sides
w = w.clip(lower=-cap, upper=cap)
# 2) side-wise sums after capping
pos_sum = w[w > 0].sum()
neg_sum = -w[w < 0].sum()
# 3) scale DOWN only, never up (keeps the cap hard)
target_side = 0.5
if pos_sum > 0:
w.loc[w > 0] *= min(target_side / pos_sum, 1.0)
if neg_sum > 0:
w.loc[w < 0] *= min(target_side / neg_sum, 1.0)
return w
else:
# long-only: hard cap then scale DOWN to gross=1
w = w.clip(lower=0.0, upper=cap)
gross = w.sum()
if gross > 0:
w *= min(1.0 / gross, 1.0)
return w
def make_positions(
signal_df: pd.DataFrame,
long_short: bool = LONG_SHORT,
top_q: float = TOP_QUANTILE,
freq: str = REBALANCE_FREQ,
gross_leverage: float = 1.0,
eligibility: pd.DataFrame | None = None,
per_name_cap: float | None = None,
debug: bool = False
) -> pd.DataFrame:
"""
Build cross-sectional weights from daily scores without backfilling future signals.
Optionally enforce ADV eligibility per date.
"""
if per_name_cap is None:
per_name_cap = (globals().get('PER_NAME_CAP')
if globals().get('USE_PER_NAME_CAP', False) else None)
sig = (signal_df[['ticker', 'y_pred']]
.rename(columns={'y_pred': 'score'})
.copy())
if getattr(sig.index, "tz", None) is not None:
sig.index = sig.index.tz_localize(None)
sig.index = pd.to_datetime(sig.index).normalize()
sig = sig.dropna(subset=['score'])
if debug:
print("[make_positions] pred rows:", len(sig),
"| unique dates:", sig.index.nunique(),
"| range:", sig.index.min(), "->", sig.index.max(),
"| tickers:", sig['ticker'].nunique())
scores_daily = (sig.reset_index(names='date')
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='score')
.astype(float)
.sort_index())
scores = scores_daily.resample(freq).last().dropna(how="all")
if debug:
valid_days = int((scores.notna().sum(axis=1) > 0).sum())
print("[make_positions] scores shape:", scores.shape,
"| valid rebalance days:", valid_days)
# ranks (1=best); build weights with optional eligibility mask
w = pd.DataFrame(0.0, index=scores.index, columns=scores.columns)
for dt, row in scores.iterrows():
r = row.dropna()
if eligibility is not None and dt in eligibility.index:
elig_row = eligibility.loc[dt].reindex(r.index)
r = r[elig_row.fillna(False)]
if len(r) == 0:
continue
k = max(1, int(round(len(r) * top_q)))
ranks = r.rank(ascending=False, method='first')
top = ranks.nsmallest(k).index
if long_short:
bot = ranks.nlargest(k).index
w.loc[dt, top] = +gross_leverage / (2*k)
w.loc[dt, bot] = -gross_leverage / (2*k)
else:
w.loc[dt, top] = gross_leverage / k
if per_name_cap is not None and per_name_cap > 0:
w.loc[dt] = _cap_and_renorm(w.loc[dt], per_name_cap, long_short)
return w
def compute_returns_matrix(d: pd.DataFrame) -> pd.DataFrame:
"""
Convert (date, ticker, close) rows into a date x ticker matrix of SIMPLE returns.
De-duplicates (date,ticker) by taking the last row.
"""
x = (d.reset_index(names='date')
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last'))
px = x.pivot(index='date', columns='ticker', values='close')
if getattr(px.index, "tz", None) is not None:
px.index = px.index.tz_localize(None)
px.index = pd.to_datetime(px.index).normalize()
rets = px.pct_change().fillna(0.0) # simple returns
return rets.sort_index()
def backtest(
signal_pred: pd.DataFrame,
prices_df: pd.DataFrame,
long_short: bool = LONG_SHORT,
top_q: float = TOP_QUANTILE,
freq: str = REBALANCE_FREQ,
holding_period: int = HOLDING_PERIOD,
tcost_bps: float = TCOST_BPS,
eligibility: pd.DataFrame | None = None,
per_name_cap: float | None = None,
gross_leverage: float = 1.0,
debug: bool = False
):
"""
Align predictions to daily returns, trade next day, and hold each rebalance for
`holding_period` days via rolling mean of weights (staggered overlap).
"""
# positions on rebalance calendar
w_rb = make_positions(signal_pred, long_short=long_short, top_q=top_q,
freq=freq, gross_leverage=gross_leverage, eligibility=eligibility,
per_name_cap=per_name_cap, debug=debug)
# daily returns matrix
rets = compute_returns_matrix(prices_df)
# align: forward-fill positions to daily dates, then select only return dates
weights = (w_rb
.reindex(w_rb.index.union(rets.index))
.sort_index()
.ffill()
.reindex(rets.index))
# common universe
common = rets.columns.intersection(weights.columns)
if debug:
print("[backtest] columns | rets:", len(rets.columns),
"weights:", len(weights.columns), "common:", len(common))
print("[backtest] weights_w first date:", w_rb.index.min(),
"| after-align nonzero rows:",
int((weights[common].abs().sum(axis=1) > 1e-12).sum()))
if w_rb.index.min() in weights.index:
print("[backtest] sum |w| at first rebalance:",
float(weights.loc[w_rb.index.min(), common].abs().sum()))
weights = weights[common].fillna(0.0)
rets = rets[common].fillna(0.0)
# enter next day; approximate H-day hold with rolling mean
w_held = weights.shift(1).rolling(holding_period).mean().fillna(0.0)
# mask out pre-deployment days (before first nonzero)
mask_live = (w_held.abs().sum(axis=1) > 1e-12)
if mask_live.any():
first_live = mask_live.idxmax() # first True
w_held = w_held.loc[first_live:]
rets = rets.loc[first_live:]
# turnover & costs
turnover = w_held.sub(w_held.shift(1).fillna(0.0)).abs().sum(axis=1)
cost = turnover * (tcost_bps / 1e4)
# portfolio simple return
port_ret = (w_held * rets).sum(axis=1) - cost
equity = (1.0 + port_ret.fillna(0.0)).cumprod()
out = pd.DataFrame({'ret': port_ret, 'turnover': turnover, 'equity': equity})
out.index.name = 'date'
return out, w_held
def perf_stats(ts: pd.Series, freq: int = 252) -> dict:
r = ts.dropna()
if len(r) == 0:
return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0)
cagr = (1 + r).prod()**(freq/len(r)) - 1
vol = r.std() * np.sqrt(freq)
sharpe = (r.mean() / r.std()) * np.sqrt(freq) if r.std() > 0 else np.nan
curve = (1 + r).cumprod()
maxdd = (curve / curve.cummax() - 1).min()
return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=maxdd, N=len(r))
This section builds two overlays that can be applied during portfolio construction.
build_adv_eligibility(df_prices, lookback=60, pct=0.20)¶Computes an ADV-based eligibility mask (date × ticker → boolean):
close × volume over lookback days.shift(1) so day-t eligibility never uses day-t information.pct percentile (default top 20% liquid) as eligible.build_rolling_beta(df_prices, lookback=252)¶Computes rolling betas of each ticker to an equal-weight market return:
lookback.shift(1) to use info only up to t-1.neutralize_vs_beta(pred_df, betas, min_names, strength)¶Removes unintended market beta exposure from daily scores:
y_pred) on betas cross-sectionally and take residuals. min_names guards against thin cross-sections; strength (0..1) scales the neutralization (1 = full). RIDGE_EPS) stabilizes low-variance cases. (date, ticker, y_pred) with residualized scores.Usage notes
eligibility mask inside make_positions(...) to skip illiquid names.neutralize_vs_beta(...) to pred_df before turning scores into weights if USE_BETA_NEUTRAL=True.# --- Risk-control helpers (ADV eligibility & beta-neutral overlay) ---
def build_adv_eligibility(df_prices: pd.DataFrame,
lookback: int = 60,
pct: float = 0.20) -> pd.DataFrame:
"""
Returns a boolean DataFrame (date x ticker) marking names eligible by ADV percentile.
Leak-safe: uses rolling mean of (close*volume) shifted by 1 day.
"""
d = (df_prices.reset_index(names='date')
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='close'))
v = (df_prices.reset_index(names='date')
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='volume'))
dv = (d * v).astype(float)
adv = dv.rolling(lookback).mean().shift(1) # no day-of info
# per-date threshold at the requested percentile
thr = adv.apply(lambda row: np.nanpercentile(row.values, pct*100), axis=1)
elig = adv.ge(thr, axis=0) & adv.notna()
elig.index.name = 'date'
return elig
def build_rolling_beta(df_prices: pd.DataFrame,
lookback: int = 252) -> pd.DataFrame:
"""
Rolling beta vs equal-weight market return. Leak-safe via shift(1).
Returns DataFrame (date x ticker) of betas.
"""
px = (df_prices.reset_index(names='date')
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='close'))
rets = px.pct_change()
mkt = rets.mean(axis=1) # equal-weight market
var_m = mkt.rolling(lookback).var()
betas = pd.DataFrame(index=rets.index, columns=rets.columns, dtype=float)
# rolling covariance per column vs market
for c in rets.columns:
cov_im = rets[c].rolling(lookback).cov(mkt)
betas[c] = cov_im / (var_m + RIDGE_EPS)
betas = betas.shift(1) # use only info up to t-1
betas.index.name = 'date'
return betas
def neutralize_vs_beta(pred_df: pd.DataFrame,
betas: pd.DataFrame,
min_names: int = MIN_XS_NAMES,
strength: float = 1.0) -> pd.DataFrame:
"""
Per-date cross-sectional regression of scores on beta; returns residualized scores.
Handles duplicate (date,ticker) by keeping the last one.
"""
s_tidy = (pred_df.reset_index().rename(columns={'index':'date'})
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last'))
s = s_tidy.pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last')
# align betas to score matrix
b = betas.reindex(s.index).reindex(columns=s.columns)
def _resid_row(y, x):
y = y.astype(float).copy()
x = x.astype(float).copy()
m = np.isfinite(y) & np.isfinite(x)
if m.sum() < min_names:
return y * np.nan
# demean both; if beta variance is ~0, just return demeaned y
y0 = y[m] - y[m].mean()
x0 = x[m] - x[m].mean()
denom = float(np.dot(x0, x0))
if denom <= RIDGE_EPS:
y_res = y.copy()
y_res[m] = y0
return y_res
beta = float(np.dot(x0, y0) / (denom + RIDGE_EPS))
y_res = y.copy()
y_res[m] = y0 - strength * beta * x0
return y_res
resid = s.combine(b, func=_resid_row, overwrite=False)
out = (resid.stack(dropna=False)
.rename('y_pred')
.reset_index()
.dropna(subset=['y_pred']))
return out.set_index('date')[['ticker','y_pred']]
This section trains models on walk-forward training sets and evaluates them on test folds, then combines predictions across multiple AS-OF lags (and optionally horizons).
Train–test loop
wf_windows_run):Lag ensemble
IC_SHRINK_LAMBDA.(Optional) Horizon ensemble
5D, 21D).HORIZON_IC_SHRINK_LAMBDA).Residualization vs 1D return
PRED_USE with (date, ticker, y_pred).from functools import reduce
# --- Train & predict: lag-ensemble + neutralization -------------------------
USE_TRAIN_IC_WEIGHTS = True # default off
IC_SHRINK_LAMBDA = 0.6 # 0=pure IC weights, 1=pure equal weights
USE_FAST = True
TARGET = f"fwd_ret_{TARGET_HORIZONS[0]}d"
# As-of lags in TRADING days (1 = current baseline).
# LAG_ENS = [1, 3, 5, 10]
LAG_ENS = [1, 3]
# weighting scheme for the ensemble: "equal" or "ic"
# ("ic" will try to compute mean IC per-lag; if compute_rank_ic isn't defined yet,
# it will auto-fallback to equal weights)
WEIGHT_METHOD = "equal"
# --- Horizon ensemble toggles ---
USE_HORIZON_ENSEMBLE = False # <— blend 5D + 21D if True
HORIZON_LIST = [5, 21] # must be subset of TARGET_HORIZONS
HORIZON_IC_SHRINK_LAMBDA = 0.6 # 0 = pure train-IC weights, 1 = equal weights
# --- Model variant toggles ----------------------------------------------------
MODEL_KIND = "ridge" # {"ridge","elasticnet","huber","hgbt"} (hgbt = HistGBR)
MODEL_PARAMS = {} # per-model overrides; leave {} for defaults
# default params (used if key missing in MODEL_PARAMS)
MODEL_DEFAULTS = {
"ridge": {"alpha": 1.0},
"elasticnet": {"alpha": 0.5, "l1_ratio": 0.2, "max_iter": 5000},
"huber": {"epsilon": 1.35, "alpha": 1e-4, "max_iter": 1000},
"hgbt": {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250,
"l2_regularization": 0.0, "max_leaf_nodes": 31}
}
def _make_pipeline(kind: str, params: dict):
p = (MODEL_DEFAULTS.get(kind, {}) | (params or {}))
steps = [("impute", SimpleImputer(strategy="median"))]
if kind in {"ridge","elasticnet","huber"}:
steps.append(("scale", StandardScaler()))
if kind == "ridge":
est = Ridge(alpha=float(p["alpha"]), random_state=RANDOM_STATE)
elif kind == "elasticnet":
est = ElasticNet(alpha=float(p["alpha"]),
l1_ratio=float(p["l1_ratio"]),
max_iter=int(p["max_iter"]),
random_state=RANDOM_STATE)
elif kind == "huber":
est = HuberRegressor(epsilon=float(p["epsilon"]),
alpha=float(p["alpha"]),
max_iter=int(p["max_iter"]))
elif kind == "hgbt":
est = HistGradientBoostingRegressor(
max_depth=int(p["max_depth"]),
learning_rate=float(p["learning_rate"]),
max_iter=int(p["max_iter"]),
l2_regularization=float(p["l2_regularization"]),
max_leaf_nodes=int(p["max_leaf_nodes"]),
random_state=RANDOM_STATE)
else:
raise ValueError(f"Unknown MODEL_KIND={kind}")
return Pipeline(steps + [("est", est)])
def fit_predict_wf_with_pipe(df_asof, feature_cols, target_col, windows, pipe) -> pd.DataFrame:
"""
Run your existing walk-forward windows with a custom sklearn Pipeline.
This is robust to different _split_wf return signatures by:
- calling _split_wf(df_asof, w)
- extracting the first and last DataFrame-like objects as train/test
"""
out = []
for w in windows:
parts = _split_wf(df_asof, w)
dfs = [p for p in parts if isinstance(p, pd.DataFrame)]
if not dfs:
raise ValueError(f"_split_wf returned no DataFrames for window {w!r}: {type(parts)=}")
# first DF is train, last DF is test
tr_df = dfs[0]
te_df = dfs[-1] if len(dfs) >= 2 else None
if te_df is None or getattr(te_df, "empty", True):
continue
pp = clone(pipe)
pp.fit(tr_df[feature_cols], tr_df[target_col])
yhat = pp.predict(te_df[feature_cols])
fold = pd.DataFrame(
{"ticker": te_df["ticker"].values, "y_pred": yhat},
index=te_df.index
)
fold.index.name = "date"
out.append(fold)
if not out:
return pd.DataFrame(columns=["ticker","y_pred"])
return (pd.concat(out).sort_index()
.reset_index().set_index("date")[["ticker","y_pred"]].sort_index())
def _compute_rank_ic_quick(pred_df: pd.DataFrame,
df_prices: pd.DataFrame,
horizon: int = 5,
min_names: int = 10,
demean: bool = True) -> pd.Series:
# predictions → tidy
p = (pred_df[['ticker','y_pred']]
.rename(columns={'y_pred':'score'})
.rename_axis('date').reset_index())
p['date'] = pd.to_datetime(p['date']).dt.normalize()
p = p[['date','ticker','score']]
# prices → wide, normalized index
px = (df_prices.reset_index(names='date')
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='close')
.astype(float).sort_index())
# realized forward simple returns
if horizon > 0:
fwd = px.pct_change(horizon).shift(-horizon)
elif horizon < 0:
H = abs(horizon); fwd = px.pct_change(H)
else:
fwd = px * 0.0
real = (fwd.stack().rename('fwd')
.reset_index().rename(columns={'level_1':'ticker'}))
ic_df = p.merge(real, on=['date','ticker'], how='left')
if demean:
ic_df['fwd'] = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())
def _daily_ic(g):
g = g.dropna(subset=['score','fwd'])
if len(g) < min_names: return np.nan
return g[['score','fwd']].corr(method='spearman').iloc[0,1]
ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
ic.index.name = 'date'
ic.name = f'IC_{horizon}d'
return ic
def _asof_extra_lag(df_base, cols, extra):
"""Apply an extra per-ticker shift to features only (keeps TARGET intact)."""
if extra <= 0:
return df_base
d = df_base.copy()
d[cols] = d.groupby('ticker', group_keys=False)[cols].shift(extra)
return d.dropna(subset=cols + [TARGET])
def _neutralize_vs_ret1_quick(pred_df, df_prices):
ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
ret1 = (df_prices.assign(ret_1d=ret1)
.assign(date=lambda x: x.index)
.reset_index(drop=True)[['date','ticker','ret_1d']])
sc = pred_df.reset_index().rename(columns={'index':'date'})
m = sc.merge(ret1, on=['date','ticker'], how='left')
def _res(g):
x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
mask = np.isfinite(x) & np.isfinite(y)
if mask.sum() < 10: g['y_pred'] = np.nan; return g
beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
return g
return (m.groupby('date', group_keys=False).apply(_res)
.dropna(subset=['y_pred'])
.set_index('date')[['ticker','y_pred']])
def compute_train_ic_weights(df_asof, feature_cols, target_col, lags, df_prices,
use_fast=True, alpha=1.0,
neutralize_ret1=True, shrink_lambda=0.5,
model_kind: str = None, model_params: dict | None = None):
# build train window = expanding start .. first fold's effective train_end
first_w = wf_windows_run[0]
tr, _, _ = _split_wf(df_asof, first_w)
if tr.empty:
return {L: 1.0/len(lags) for L in lags}
# fit once per lag on TRAIN ONLY, score train, neutralize, compute daily Spearman IC
ic_means = {}
px_wide = (df_prices.reset_index(names='date')
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.pivot(index='date', columns='ticker', values='close')
.astype(float)
.sort_index())
for L in progress(lags, total=len(lags), desc="Train IC by lag"):
dL = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
dL = dL.sort_index() # safe no-op if already sorted
tr_start, tr_end = tr.index.min(), tr.index.max()
trL = dL[(dL.index >= tr_start) & (dL.index <= tr_end)]
if trL.empty:
ic_means[L] = 0.0
continue
pipe = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)
X_tr, y_tr = trL[feature_cols], trL[target_col]
pipe.fit(X_tr, y_tr)
yhat = pipe.predict(X_tr)
pred = pd.DataFrame({"ticker": trL["ticker"].values, "y_pred": yhat}, index=trL.index)
pred.index.name = "date"
if neutralize_ret1:
pred = _neutralize_vs_ret1_quick(pred, df_prices)
# realized fwd simple returns on train
H = int(target_col.split('_')[2].rstrip('d'))
if pred.empty:
ic_means[L] = 0.0
continue
# ensure pred dates are normalized
pred.index = pd.to_datetime(pred.index).normalize()
start, end = pred.index.min(), pred.index.max()
px_tr = px_wide[(px_wide.index >= start) & (px_wide.index <= end)]
fwd = px_tr.pct_change(H).shift(-H)
# tidy merge
sc = pred.reset_index().rename(columns={'index':'date'})
real = (fwd.stack().rename('fwd')
.reset_index()
.rename(columns={'level_1': 'ticker'}))
ic_df = sc.merge(real, on=['date','ticker'], how='left')
def _daily_ic(g):
g = g.dropna(subset=['y_pred','fwd'])
if len(g) < 10: return np.nan
return g[['y_pred','fwd']].corr(method='spearman').iloc[0,1]
ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
ic_means[L] = float(ic.mean()) if len(ic) else 0.0
w = np.array([max(ic_means[L], 0.0) for L in lags], float)
if w.sum() == 0: w[:] = 1.0
w /= w.sum()
eq = np.ones_like(w) / len(lags)
w_final = (1 - shrink_lambda)*w + shrink_lambda*eq
return dict(zip(lags, w_final))
def _cs_zscore_by_date(df_scores: pd.DataFrame) -> pd.DataFrame:
"""
Cross-sectional z-score per date on column 'y_pred'.
Input: index=date, cols=['ticker','y_pred']
Output: same schema with z-scored y_pred.
"""
def _z(g):
s = g['y_pred'].astype(float)
sd = float(s.std(ddof=0))
if not np.isfinite(sd) or sd <= 1e-12:
return pd.Series(0.0, index=s.index)
return (s - float(s.mean())) / sd
return df_scores.groupby(level=0, group_keys=False).apply(lambda g: g.assign(y_pred=_z(g)))
def _make_df_asof_for_h(df_asof_base: pd.DataFrame, target_h: int) -> pd.DataFrame:
"""
Reuse your existing df_asof (features already shifted 1D, leaky features dropped),
but keep only rows where the H-target exists.
"""
tcol = f"fwd_ret_{target_h}d"
assert tcol in df_asof_base.columns, f"Missing target {tcol} in df_asof"
return df_asof_base.dropna(subset=[tcol])
def _fitpredict_one_horizon(target_h: int,
model_kind: str = None,
model_params: dict | None = None
) -> tuple[dict, pd.DataFrame, pd.DataFrame]:
"""
Build the lag-ensemble for a given horizon H (5 or 21):
- per-lag predictions via your WF runner
- train-only IC weights across lags (if enabled)
- per-date neutralization vs 1D return
Returns:
pred_by_lag_H : {L: DataFrame(date,[ticker,y_pred_L{L}])}
pred_ens_raw : DataFrame(date,[ticker, y_pred]) (pre-neutralization)
PRED_BASE_H : DataFrame(date,[ticker, y_pred]) (post 1D-neutralization)
"""
tcol = f"fwd_ret_{target_h}d"
dfH = _make_df_asof_for_h(df_asof, target_h)
# 1) per-lag predictions
pred_by_lag_H = {}
for L in progress(LAG_ENS, total=len(LAG_ENS), desc=f"Lags (H={target_h}d)"):
dfL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
pipeH = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)
pL = fit_predict_wf_with_pipe(dfL, feature_cols, tcol, wf_windows_run, pipeH)
pL = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
pred_by_lag_H[L] = pL[['ticker', f'y_pred_L{L}']]
# 2) align and weight across lags (train-only IC if enabled)
dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag_H.values()]
pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
if USE_TRAIN_IC_WEIGHTS:
W_lag = compute_train_ic_weights(
df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
use_fast=True, alpha=1.0, neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
model_kind=(model_kind or MODEL_KIND), model_params=(model_params or MODEL_PARAMS)
)
else:
W_lag = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}
pred_ens['y_pred'] = 0.0
for L in LAG_ENS:
pred_ens['y_pred'] += float(W_lag[L]) * pred_ens[f'y_pred_L{L}']
pred_ens_raw = (pred_ens[['y_pred']]
.reset_index()
.set_index('date')[['ticker','y_pred']]
.sort_index())
# 3) neutralize vs prior 1D return (same as 5D path for consistency)
predN = (pred_ens_raw.reset_index().rename(columns={'index':'date'})
.merge(
df.assign(ret_1d=df.groupby('ticker')['close'].pct_change(1))\
.assign(date=lambda x: x.index)\
.reset_index(drop=True)[['date','ticker','ret_1d']],
on=['date','ticker'], how='left'))
def _res(g):
x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
m = np.isfinite(x) & np.isfinite(y)
if m.sum() < 10: g['y_pred'] = np.nan; return g
beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
g.loc[m, 'y_pred'] = y[m] - beta * x[m]
return g
PRED_BASE_H = (predN.groupby('date', group_keys=False).apply(_res)
.dropna(subset=['y_pred'])
.set_index('date')[['ticker','y_pred']])
# Dedup safety
PRED_BASE_H = (PRED_BASE_H.reset_index().sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.set_index('date')[['ticker','y_pred']])
return pred_by_lag_H, pred_ens_raw, PRED_BASE_H
# 1) fit/predict for each as-of lag
pred_by_lag = {}
for L in progress(LAG_ENS, total=len(LAG_ENS), desc="Lag ensemble (main)"):
dfL = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
pipe0 = _make_pipeline(MODEL_KIND, MODEL_PARAMS)
pL = fit_predict_wf_with_pipe(dfL, feature_cols, TARGET, wf_windows_run, pipe0)
pL = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
pred_by_lag[L] = pL[['ticker', f'y_pred_L{L}']]
if USE_TQDM and '_tqdm' in globals() and _tqdm is not None:
_tqdm.write(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")
else:
print(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")
# 2) align all lag predictions on (date, ticker)
dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag.values()] # -> MultiIndex (date,ticker)
pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
lag_cols = [f'y_pred_L{L}' for L in LAG_ENS]
# 3) weights
if USE_TRAIN_IC_WEIGHTS:
WEIGHTS = compute_train_ic_weights(
df_asof, feature_cols, TARGET, LAG_ENS, df,
neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
model_kind=MODEL_KIND, model_params=MODEL_PARAMS
)
print("[ens] Train-only IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})
elif WEIGHT_METHOD == "ic" and 'compute_rank_ic' in globals():
try:
# original test-set IC weighting path
px_tidy = df[['ticker','close']].copy()
if getattr(px_tidy.index, "tz", None) is not None:
px_tidy.index = px_tidy.index.tz_localize(None)
px_tidy.index = pd.to_datetime(px_tidy.index).normalize()
ic_means = {}
for L in LAG_ENS:
tmp = (pred_ens[[f'y_pred_L{L}']]
.rename(columns={f'y_pred_L{L}':'y_pred'})
.reset_index(level=1)
.rename_axis('date'))
ic = compute_rank_ic(tmp, px_tidy, horizon=TARGET_HORIZONS[0], min_names=10, demean=True)
ic_means[L] = float(ic.mean()) if len(ic) else 0.0
w = np.array([max(ic_means[L], 0.0) for L in LAG_ENS], float)
if w.sum() == 0: w[:] = 1.0
w = w / w.sum()
WEIGHTS = dict(zip(LAG_ENS, w))
print("[ens] IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})
except Exception as e:
print(f"[ens] IC weighting failed ({e}); using equal weights.")
WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}
else:
WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}
# 4) weighted average prediction
pred_ens['y_pred'] = 0.0
for L in LAG_ENS:
pred_ens['y_pred'] += WEIGHTS[L] * pred_ens[f'y_pred_L{L}']
# reshape back to expected schema: index=date, columns ['ticker','y_pred']
pred_ens = (pred_ens[['y_pred']]
.reset_index()
.set_index('date')[['ticker','y_pred']]
.sort_index())
print("Pred-ensemble rows:", len(pred_ens),
"| unique dates:", pred_ens.index.nunique(),
"| tickers:", pred_ens['ticker'].nunique())
# 5) per-date neutralization vs prior 1D return
ret1 = (df[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d'))
ret1 = (df.assign(ret_1d=ret1)
.assign(date=lambda x: x.index)
.reset_index(drop=True)[['date','ticker','ret_1d']])
predN = (pred_ens.reset_index().rename(columns={'index':'date'})
.merge(ret1, on=['date','ticker'], how='left'))
def _resid(g):
x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
m = np.isfinite(x) & np.isfinite(y)
if m.sum() < 10:
g['y_pred'] = np.nan
return g
beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
g.loc[m, 'y_pred'] = y[m] - beta * x[m]
return g
PRED_USE = (predN.groupby('date', group_keys=False).apply(_resid)
.dropna(subset=['y_pred'])
.set_index('date')[['ticker','y_pred']])
Lag ensemble (main): 0%| | 0/2 [00:00<…
[ens] L=1 | rows=50766 | dates=854 | span=2021-08-01 00:00:00→2024-12-22 00:00:00 [ens] L=3 | rows=50760 | dates=854 | span=2021-08-01 00:00:00→2024-12-22 00:00:00
Train IC by lag: 0%| | 0/2 [00:00<…
[ens] Train-only IC weights: {1: 0.5134, 3: 0.4866}
Pred-ensemble rows: 53574 | unique dates: 854 | tickers: 60
This section extends the lag-ensemble to blend multiple target horizons (5D and 21D in this case).
Again, all weights are learned only on the training data of the first fold and applied to the test folds.
Per-horizon lag-ensembles
_fitpredict_one_horizon(5) and _fitpredict_one_horizon(21).PRED_BASE_H) neutralized vs prior 1D returns.Train-only horizon weights
H ∈ {5, 21}:HORIZON_IC_SHRINK_LAMBDA) to reduce variance.Blend across horizons
z5, z21). PRED_BASE_HENS = horizon-ensemble prediction series.Final predictions
PRED_USE = PRED_BASE_HENS if horizon-ensemble is enabled.# ----- 21D clone + Horizon ensemble (toggle) -----
if USE_HORIZON_ENSEMBLE:
assert set(HORIZON_LIST).issubset(set(TARGET_HORIZONS)), "HORIZON_LIST must be in TARGET_HORIZONS"
# 5D & 21D paths
pred_by_lag_5, pred_ens_5_raw, PRED_BASE_5D = _fitpredict_one_horizon(5, MODEL_KIND, MODEL_PARAMS)
pred_by_lag_21, pred_ens_21_raw, PRED_BASE_21D = _fitpredict_one_horizon(21, MODEL_KIND, MODEL_PARAMS)
# --- Train-only horizon weights (IC on train; shrink toward equal) ---
def _train_ic_for_horizon(H: int,
model_kind: str = MODEL_KIND,
model_params: dict | None = MODEL_PARAMS) -> float:
"""
Train-time IC for a given horizon H using the SAME model variant as production.
"""
tcol = f"fwd_ret_{H}d"
dfH = _make_df_asof_for_h(df_asof, H)
# lag weights on TRAIN for this horizon, with same model variant
Wlag = compute_train_ic_weights(
df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
model_kind=model_kind, model_params=model_params
)
# first train DataFrame from the first window
parts = _split_wf(dfH, wf_windows_run[0])
dfs = [p for p in parts if isinstance(p, pd.DataFrame)]
tr = dfs[0] if dfs else pd.DataFrame()
if tr.empty:
return 0.0
# template pipeline; clone per-lag
pipe_tpl = _make_pipeline(model_kind, model_params)
preds = []
for L in LAG_ENS:
dL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
dL = dL[(dL.index >= tr.index.min()) & (dL.index <= tr.index.max())]
dL = dL.dropna(subset=[tcol])
if dL.empty:
continue
pp = clone(pipe_tpl)
X_tr, y_tr = dL[feature_cols], dL[tcol]
pp.fit(X_tr, y_tr)
yhat = pp.predict(X_tr)
out = pd.DataFrame({"ticker": dL["ticker"].values,
f"y_pred_L{L}": yhat}, index=dL.index)
out.index.name = "date"
preds.append(out)
if not preds:
return 0.0
# weight per-lag train preds with Wlag
P = reduce(lambda a,b: a.join(b, how='inner'),
[p.set_index(['ticker'], append=True).sort_index() for p in preds])
P['y_pred'] = 0.0
for L in LAG_ENS:
col = f'y_pred_L{L}'
if col in P.columns:
P['y_pred'] += float(Wlag[L]) * P[col]
P = (P[['y_pred']].reset_index().set_index('date')[['ticker','y_pred']].sort_index())
# neutralize vs. 1D return and compute IC on the train span
Pn = _neutralize_vs_ret1_quick(P, df)
ic_series = _compute_rank_ic_quick(Pn, df[['ticker','close']], horizon=H, min_names=10, demean=True)
return float(ic_series.mean()) if len(ic_series) else 0.0
# compute horizon weights on train
ic_train_by_H = {}
for H in progress(HORIZON_LIST, total=len(HORIZON_LIST), desc="H-weights train IC"):
ic_train_by_H[H] = max(_train_ic_for_horizon(H), 0.0)
w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
if w_raw.sum() == 0:
w_raw[:] = 1.0
w_raw /= w_raw.sum()
w_eq = np.ones_like(w_raw) / len(HORIZON_LIST)
wH = (1 - HORIZON_IC_SHRINK_LAMBDA) * w_raw + HORIZON_IC_SHRINK_LAMBDA * w_eq
H_WEIGHTS = dict(zip(HORIZON_LIST, wH))
H_W_SUMMARY = {
'H_w5': float(H_WEIGHTS.get(5, np.nan)),
'H_w21': float(H_WEIGHTS.get(21, np.nan)),
}
print("[h-ens] weights detail:", H_W_SUMMARY)
print("[h-ens] Train-only horizon weights:", {int(k): round(v,4) for k,v in H_WEIGHTS.items()})
# --- Per-date z-score + blend across horizons (outer join, fill mean=0) ---
Z5 = _cs_zscore_by_date(PRED_BASE_5D).rename(columns={'y_pred':'z5'})
Z21 = _cs_zscore_by_date(PRED_BASE_21D).rename(columns={'y_pred':'z21'})
Z = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)
y_blend = H_WEIGHTS.get(5, 0)*Z['z5'] + H_WEIGHTS.get(21, 0)*Z['z21']
PRED_BASE_HENS = (pd.DataFrame({'date': Z['date'], 'ticker': Z['ticker'], 'y_pred': y_blend})
.set_index('date')[['ticker','y_pred']].sort_index())
PRED_USE = PRED_BASE_HENS.copy()
else:
# keep existing 5D path result in PRED_USE
pass
At this stage we apply risk controls to the test-set predictions PRED_USE.
The purpose is to align raw model outputs with realistic portfolio constraints.
Deduplication & snapshot
(date, ticker) row remains per prediction (keep last if duplicates). PRED_BASE = post–1D-return neutralized predictions, pre-beta-neutral overlay (useful for ablation tests).ADV eligibility filter (if enabled)
ADV_PCT). Beta-neutral overlay (if enabled)
Final check
(date, ticker) rows as a debug sanity check.PRED_USE: final test-set predictions after risk controls. PRED_BASE: snapshot before beta-neutralization, for comparison.# --- Risk controls (toggled) ---
# 0) Ensure uniqueness before overlays & keep a pre-beta snapshot for ablations
def _dedup_scores(df_scores: pd.DataFrame) -> pd.DataFrame:
return (df_scores.reset_index().rename(columns={'index':'date'})
.sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.set_index('date')[['ticker','y_pred']])
PRED_USE = _dedup_scores(PRED_USE) # post 1D-return neutralization
PRED_BASE = PRED_USE.copy() # <-- snapshot PRE-BETA for ablations
# 1) ADV eligibility
if 'ELIG_ADV' not in globals():
ELIG_ADV = None
if USE_ADV_ELIGIBILITY:
ELIG_ADV = build_adv_eligibility(df, lookback=ADV_LOOKBACK_DAYS, pct=ADV_PCT)
print(f"[elig] ADV lookback={ADV_LOOKBACK_DAYS}d, pct={ADV_PCT:.2f} → shape {ELIG_ADV.shape}")
# 2) Beta-neutral overlay on scores
if USE_BETA_NEUTRAL:
BETAS = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS)
PRED_USE = neutralize_vs_beta(PRED_BASE, BETAS,
min_names=MIN_XS_NAMES,
strength=BETA_NEUTRAL_STRENGTH)
PRED_USE = _dedup_scores(PRED_USE)
print(f"[beta] applied beta-neutral overlay (lookback={BETA_LOOKBACK_DAYS}d, strength={BETA_NEUTRAL_STRENGTH})")
_dups = (PRED_USE.reset_index().rename(columns={'index':'date'})
.duplicated(['date','ticker']).sum())
print(f"[debug] duplicate (date,ticker) rows in PRED_USE: {_dups}")
[elig] ADV lookback=60d, pct=0.20 → shape (2763, 60) [debug] duplicate (date,ticker) rows in PRED_USE: 0
We now evaluate the test-set predictions by running the execution-side backtest and reporting core portfolio stats.
Inputs
PRED_USE – final test predictions (after train→test WF pipeline, lag/horizon ensembles, and post-prediction overlays).px – prices (close) for realized returns.Backtest logic
REBALANCE_FREQ, trade next day (as-of discipline), hold for HOLDING_PERIOD via rolling-mean overlap, apply transaction costs.ret, turnover, and cumulative equity.Reports
perf_stats: CAGR, annualized Vol, Sharpe, Max Drawdown, and sample size.px = df[['ticker','close']].copy()
bt_main, w_held_main = backtest(
PRED_USE, px,
long_short=LONG_SHORT,
top_q=TOP_QUANTILE,
freq=REBALANCE_FREQ,
holding_period=HOLDING_PERIOD,
tcost_bps=TCOST_BPS,
eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
gross_leverage=GROSS_LEVERAGE,
debug=False
)
bt_5d = bt_main
w_held_5d = w_held_main
curve_title = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
print_label = 'Perf (h-ens)' if USE_HORIZON_ENSEMBLE else 'Perf (5D target)'
print(f"{print_label}:", perf_stats(bt_main['ret']))
print("Avg |w| per day:", float(w_held_main.abs().sum(axis=1).mean()))
print("Avg names held:", float((w_held_main.abs() > 1e-12).sum(axis=1).mean()))
print("Avg daily turnover:", float(bt_main['turnover'].mean()))
plot_equity_curve(bt_main['equity'], title=curve_title)
if USE_ADV_ELIGIBILITY and ELIG_ADV is not None:
elig_frac = ELIG_ADV.mean(axis=1)
print(f"[elig] mean eligible names/day: {float(ELIG_ADV.sum(axis=1).mean()):.1f}")
print(f"[elig] mean eligible fraction: {elig_frac.mean():.2%} | min: {elig_frac.min():.2%}")
if USE_PER_NAME_CAP and PER_NAME_CAP:
print(f"[cap] per-name cap active at {PER_NAME_CAP:.2%} (long_short={LONG_SHORT})")
Perf (5D target): {'CAGR': 0.09731340486377915, 'Vol': 0.03370629225265959, 'Sharpe': 2.772443375609235, 'MaxDD': -0.018407797326819852, 'N': 849}
Avg |w| per day: 0.46857950530035364
Avg names held: 31.288574793875146
Avg daily turnover: 0.07831330977620733
[elig] mean eligible names/day: 45.5 [elig] mean eligible fraction: 75.83% | min: 0.00% [cap] per-name cap active at 2.00% (long_short=True)
In this section, we take the final test-set predictions and evaluate execution-side overlays to see how each affects performance.
run_variant(): evaluates a labeled variant by optionally applying:beta_strength ∈ [0,1].CAGR, Vol, Sharpe, MaxDD, N.BETA_NEUTRAL_STRENGTH).These ablations isolate the execution-side controls and quantify their impact on realized performance without contaminating train/test splits.
def run_variant(label,
pred_base: pd.DataFrame,
adv: pd.DataFrame | None = None,
betas: pd.DataFrame | None = None,
beta_strength: float = 1.0,
per_name_cap: float | None = None):
"""
Evaluate a variant: optional ADV eligibility and optional beta neutralization (with strength).
pred_base must be (index=date, cols=['ticker','y_pred']) AFTER 1D-return neutralization.
"""
P = pred_base
if betas is not None and beta_strength > 0:
P = neutralize_vs_beta(P, betas, min_names=MIN_XS_NAMES, strength=beta_strength)
P = (P.reset_index().sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.set_index('date')[['ticker','y_pred']])
bt, _ = backtest(P, df[['ticker','close']],
long_short=LONG_SHORT, top_q=TOP_QUANTILE,
freq=REBALANCE_FREQ, holding_period=HOLDING_PERIOD,
tcost_bps=TCOST_BPS, eligibility=adv,
per_name_cap=per_name_cap, # pass cap through
debug=False)
s = perf_stats(bt['ret'])
s.update(dict(Label=label))
return s
# small ablation table around PRED_BASE (pre-beta) and BETAS/ELIG_ADV
_base = PRED_BASE
_cap = (PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None)
_adv = (ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None)
rows = []
rows.append(run_variant("Base (no overlays)", _base, adv=None, betas=None, per_name_cap=_cap))
rows.append(run_variant("ADV only", _base, adv=_adv, betas=None, per_name_cap=_cap))
if 'BETAS' in globals():
rows.append(run_variant("Beta 1.0 only", _base, adv=None, betas=BETAS, beta_strength=1.0, per_name_cap=_cap))
rows.append(run_variant("Beta 0.5 only", _base, adv=None, betas=BETAS, beta_strength=0.5, per_name_cap=_cap))
rows.append(run_variant("ADV + Beta", _base, adv=_adv, betas=BETAS, beta_strength=BETA_NEUTRAL_STRENGTH, per_name_cap=_cap))
abl = (pd.DataFrame(rows)
.set_index('Label')[['CAGR','Vol','Sharpe','MaxDD','N']]
.sort_values('Sharpe', ascending=False))
display(abl.round(3))
| CAGR | Vol | Sharpe | MaxDD | N | |
|---|---|---|---|---|---|
| Label | |||||
| Base (no overlays) | 0.0970 | 0.0340 | 2.7720 | -0.0180 | 849 |
| ADV only | 0.0970 | 0.0340 | 2.7720 | -0.0180 | 849 |
We assess the cross-sectional predictive power of our final test-set predictions PRED_USE using Rank-IC (daily Spearman correlation between scores and future returns).
compute_rank_ic()
Tidy-merges predictions with realized returns and computes daily Spearman IC, with optional demeaning per date to remove market drift.
horizon > 0: forward returns over h days, shifted to avoid leakage. horizon < 0: backward check (sanity/look-ahead probe). min_names: minimum cross-section size per day; else IC is NaN.True IC on primary horizon (H = TARGET_HORIZONS[0])
Reports mean IC, IC std, t-stat ((\bar{IC}/\sigma(IC))\sqrt{N}), and number of days.
Rolling IC
60-day rolling mean IC plot to visualize temporal stability.
Cross-sectional dispersion sanity check
Warns if daily score variance is zero (Spearman undefined).
+1D lag probe
Shifts predictions by one day before evaluation to ensure the signal is not relying on same-day artifacts.
Shuffle tests (null baselines)
(a) Cross-section shuffle per date — randomizes ranks within each day (IC should ≈ 0).
(b) Calendar shuffle — permutes the return dates while keeping predictions fixed (IC should ≈ 0).
Lead/Lag IC profile
IC across horizons −15…+15 days:
These diagnostics quantify out-of-sample ranking skill, check for leakage or timing artifacts, and reveal where the signal’s horizon is strongest.
# Rank-IC diagnostics
px_tidy = df[['ticker','close']].copy()
if getattr(px_tidy.index, "tz", None) is not None:
px_tidy.index = px_tidy.index.tz_localize(None)
px_tidy.index = pd.to_datetime(px_tidy.index).normalize()
def compute_rank_ic(
pred_df: pd.DataFrame,
df_prices: pd.DataFrame,
horizon: int = 5,
min_names: int = 10,
demean: bool = True
) -> pd.Series:
# predictions → tidy
p = (pred_df[['ticker', 'y_pred']]
.rename(columns={'y_pred': 'score'})
.rename_axis('date')
.reset_index())
p['date'] = pd.to_datetime(p['date']).dt.normalize()
p = p[['date','ticker','score']]
# prices → tidy + normalized index
px = df_prices[['ticker','close']].copy()
if getattr(px.index, "tz", None) is not None:
px.index = px.index.tz_localize(None)
px.index = pd.to_datetime(px.index).normalize()
# realized returns per ticker
if horizon > 0:
fwd = px.groupby('ticker')['close'].pct_change(horizon).shift(-horizon)
elif horizon < 0:
H = abs(horizon)
fwd = px.groupby('ticker')['close'].pct_change(H)
else:
fwd = pd.Series(0.0, index=px.index)
real = (px.assign(fwd=fwd)
.assign(date=lambda x: x.index)
.reset_index(drop=True)[['date','ticker','fwd']])
# merge preds with realized forward returns
ic_df = p.merge(real, on=['date','ticker'], how='left')
if demean:
ic_df['fwd'] = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())
def _daily_ic(g):
g = g.dropna(subset=['score','fwd'])
if len(g) < min_names:
return np.nan
return g[['score','fwd']].corr(method='spearman').iloc[0,1]
ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
ic.index.name = 'date'
ic.name = f'IC_{horizon}d'
return ic
# --- True IC on the primary horizon ---
H = TARGET_HORIZONS[0]
ic_true = compute_rank_ic(PRED_USE, px_tidy, horizon=H, min_names=10, demean=True)
N = len(ic_true)
ic_mean = float(ic_true.mean()) if N else np.nan
ic_std = float(ic_true.std()) if N else np.nan
t_stat = (ic_mean / ic_std * np.sqrt(N)) if (N and ic_std > 0) else np.nan
print(f"IC mean: {ic_mean:.4f} | IC std: {ic_std:.4f} | t-stat: {t_stat:.2f} | N days: {N}")
plot_rolling_ic(ic_true, window=60, title=f"60D Rolling Rank IC ({H}D horizon)")
# Sanity: CS dispersion
cs_std = PRED_USE.groupby(level=0)['y_pred'].std()
if (cs_std.fillna(0) == 0).all():
print("All days have zero cross-sectional std; Spearman IC will be NaN.")
# +1D lag probe
pred_lag1 = (PRED_USE.copy()
.groupby('ticker', group_keys=False)
.apply(lambda g: g.assign(y_pred=g['y_pred'].shift(1))))
ic_lag1 = compute_rank_ic(pred_lag1, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (+1D lag) mean:", float(ic_lag1.mean()) if len(ic_lag1) else np.nan)
# Shuffle tests (seeded)
rng = np.random.default_rng(42)
# (a) cross-section shuffle per date
pred_shuf = (PRED_USE.rename(columns={'y_pred':'score'})
.rename_axis('date').reset_index()
.groupby('date', group_keys=False)
.apply(lambda g: g.assign(score=rng.permutation(g['score'].values)))
.set_index('date')
.rename(columns={'score':'y_pred'}))
ic_shuf = compute_rank_ic(pred_shuf, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (shuffle xs) mean:", float(ic_shuf.mean()) if len(ic_shuf) else np.nan)
# (b) calendar shuffle: permute dates of returns
u_dates = px_tidy.index.unique()
perm_map = dict(zip(u_dates, rng.permutation(u_dates)))
px_perm = px_tidy.copy()
px_perm.index = px_perm.index.map(perm_map)
ic_cal = compute_rank_ic(PRED_USE, px_perm, horizon=H, min_names=10, demean=True)
print("IC (calendar shuffle) mean:", float(ic_cal.mean()) if len(ic_cal) else np.nan)
_ = lead_lag_ic_profile_plotly(
PRED_USE, px_tidy, horizons=range(-15, 16),
compute_rank_ic_fn=compute_rank_ic
)
IC mean: 0.0934 | IC std: 0.2053 | t-stat: 13.18 | N days: 840
IC (+1D lag) mean: 0.0916633364153255 IC (shuffle xs) mean: 0.0038697698595677203 IC (calendar shuffle) mean: 0.006396083685574156
Lead/Lag IC: 0%| | 0/31 [00:00<…
# --- Annual breakdown (perf + IC) ---
# DataFrame of perf stats by calendar year
rows = []
for y, s in bt_5d['ret'].groupby(bt_5d.index.year):
d = perf_stats(s) # -> dict
d['year'] = int(y)
rows.append(d)
yr_perf = pd.DataFrame(rows).set_index('year').sort_index()
# Mean daily IC per year
if 'ic_true' in globals() and isinstance(ic_true, pd.Series) and len(ic_true):
yr_ic = ic_true.groupby(ic_true.index.year).mean()
yr_ic.name = 'IC_mean'
yr_ic = yr_ic.to_frame()
yr_ic.index.name = 'year'
else:
yr_ic = pd.DataFrame(columns=['IC_mean'])
annual = yr_perf.join(yr_ic, how='left')
display(annual.round(3))
| CAGR | Vol | Sharpe | MaxDD | N | IC_mean | |
|---|---|---|---|---|---|---|
| year | ||||||
| 2021 | 0.0130 | 0.0310 | 0.4300 | -0.0140 | 101 | 0.0420 |
| 2022 | 0.0830 | 0.0340 | 2.3820 | -0.0170 | 251 | 0.0850 |
| 2023 | 0.1200 | 0.0370 | 3.0470 | -0.0180 | 250 | 0.1040 |
| 2024 | 0.1260 | 0.0310 | 3.8810 | -0.0160 | 247 | 0.1140 |
We evaluate each AS-OF lag separately and compare them to the final ensemble on the test folds.
1) Per-lag IC means
L, optionally residualize scores vs prior 1D return (neutralize_single=True), then compute daily Spearman Rank-IC on the test set at horizon H. 2) Per-lag backtests vs ensemble
pred_by_lag = {L: (date,[ticker,y_pred_L{L}])}, pred_ensemble = (date,[ticker,y_pred]), prices df_prices, horizon H. s (Series of per-lag mean IC), perf_df (per-lag perf metrics), eq_df (equity curves per lag and ensemble).Note: This is a test-only analysis — no retraining occurs here. It helps attribute where the ensemble’s edge comes from and whether any single lag dominates.
# --- Lag-ensemble diagnostics: per-lag ICs + equity curves -------------------
def _px_tidy(df_prices):
px = df_prices[['ticker','close']].copy()
if getattr(px.index, "tz", None) is not None:
px.index = px.index.tz_localize(None)
px.index = pd.to_datetime(px.index).normalize()
return px
def _neutralize_vs_ret1(pred_df, df_prices):
"""Per-date residualize y_pred against prior 1D close-to-close return."""
ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
ret1 = (df_prices.assign(ret_1d=ret1)
.assign(date=lambda x: x.index)
.reset_index(drop=True)[['date','ticker','ret_1d']])
sc = pred_df.reset_index().rename(columns={'index':'date'})
m = sc.merge(ret1, on=['date','ticker'], how='left')
def _resid_per_date(g):
x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
mask = np.isfinite(x) & np.isfinite(y)
if mask.sum() < 10:
g['y_pred'] = np.nan
return g
beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
return g
out = (m.groupby('date', group_keys=False).apply(_resid_per_date)
.dropna(subset=['y_pred'])
.set_index('date')[['ticker','y_pred']])
return out
def lag_ens_diagnostics(pred_by_lag: dict,
pred_ensemble: pd.DataFrame,
df_prices: pd.DataFrame,
horizon: int,
neutralize_single: bool = True,
title_suffix: str = ""):
"""
pred_by_lag: {L: DataFrame(index=date, cols=['ticker', f'y_pred_L{L}'])}
pred_ensemble: DataFrame(index=date, cols=['ticker','y_pred']) (already neutralized)
df_prices: wide tidy prices (the 'df' you use everywhere)
"""
TOP_Q = globals().get('TOP_QUANTILE', 0.2)
FREQ = globals().get('REBALANCE_FREQ', 'W-FRI')
HP = globals().get('HOLDING_PERIOD', 5)
TCOST = globals().get('TCOST_BPS', 0)
px = _px_tidy(df_prices)
lags = sorted(pred_by_lag.keys())
# --- 1) per-lag IC means
ic_means = {}
preds_use = {}
for L in progress(lags, total=len(lags), desc="Per-lag IC"):
pL = pred_by_lag[L].copy()
col = f'y_pred_L{L}'
if col not in pL.columns:
col = 'y_pred'
pL = pL.rename(columns={col: 'y_pred'})
if neutralize_single:
pL = _neutralize_vs_ret1(pL, df_prices)
preds_use[L] = pL
ic = compute_rank_ic(pL, px, horizon=horizon, min_names=10, demean=True)
ic_means[L] = float(ic.mean()) if len(ic) else np.nan
# --- Plot per-lag ICs
s = bar_ic_means(ic_means, title=f'Per-lag mean IC (H={horizon}d){(" " + title_suffix) if title_suffix else ""}')
# --- 2) Backtest each lag and the ensemble; plot equity overlay
eq = {}
rows = []
# individual lags
for L in progress(lags, total=len(lags), desc="Per-lag backtests"):
bt, _ = backtest(
preds_use[L], df_prices[['ticker','close']],
long_short=False, top_q=TOP_Q, freq=FREQ,
holding_period=HP, tcost_bps=TCOST,
eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
debug=False
)
eq[f"L{L}"] = bt['equity'].rename(f"L{L}")
stats = perf_stats(bt['ret'])
stats.update(dict(model=f"L{L}", IC_mean=s.loc[L]))
rows.append(stats)
# ensemble (already neutralized upstream)
btE, _ = backtest(
pred_ensemble, df_prices[['ticker','close']],
long_short=False, top_q=TOP_Q, freq=FREQ,
holding_period=HP, tcost_bps=TCOST,
eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
debug=False
)
eq["Ens"] = btE['equity'].rename("Ens")
eq_df = pd.concat(eq, axis=1).dropna(how='all')
multi_equity_plot(eq, title=f"Equity curves: individual lags vs ensemble{(' ' + title_suffix) if title_suffix else ''}")
perf_df = pd.DataFrame(rows).set_index('model').sort_values('Sharpe', ascending=False)
display(perf_df.round(3))
return s, perf_df, eq_df
ic_by_lag, perf_by_lag, eq_df = lag_ens_diagnostics(
pred_by_lag,
PRED_USE,
df,
horizon=TARGET_HORIZONS[0],
neutralize_single=True,
title_suffix="(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
)
Per-lag IC: 0%| | 0/2 [00:00<…
Per-lag backtests: 0%| | 0/2 [00:00<…
| CAGR | Vol | Sharpe | MaxDD | N | IC_mean | |
|---|---|---|---|---|---|---|
| model | ||||||
| L1 | 0.0700 | 0.0480 | 1.4240 | -0.0510 | 849 | 0.0910 |
| L3 | 0.0640 | 0.0490 | 1.2940 | -0.0540 | 849 | 0.0870 |
We quantify any remaining market beta exposure in the test-set predictions after overlays.
Method
y_pred cross-sectionally on rolling betas:
[
\text{beta_slope}_t \;=\; \frac{\sumi (x{it}-\bar xt)(y{it}-\bar y_t)}{\sumi (x{it}-\bar xt)^2}
]
where (x{it}) is the rolling beta of ticker (i) and (y_{it}) is the prediction score.MIN_XS_NAMES names per day; otherwise returns NaN.Interpretation
beta_slope ≈ 0 ⇒ scores are neutral to market beta (as intended).Outputs
beta_slope: daily series of estimated beta loading of the scores.def beta_exposure_series(pred_df, betas):
s = (pred_df.reset_index()
.pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last'))
b = betas.reindex(s.index).reindex(columns=s.columns)
expo = []
for dt in s.index:
y = s.loc[dt].values; x = b.loc[dt].values
m = np.isfinite(y) & np.isfinite(x)
if m.sum() < MIN_XS_NAMES: expo.append(np.nan); continue
y0 = y[m] - y[m].mean(); x0 = x[m] - x[m].mean()
expo.append(float(np.dot(x0, y0) / (np.dot(x0, x0) + 1e-12)))
return pd.Series(expo, index=s.index, name='beta_slope')
if USE_BETA_NEUTRAL:
beta_slope = beta_exposure_series(PRED_USE, BETAS)
print("Beta exposure mean (should be ~0):", float(beta_slope.mean()))
This section trains on walk-forward training folds and evaluates on their test folds for a set of model variants (e.g., Ridge, ElasticNet, HGBT). For each variant we build horizon-ensemble predictions (5D + 21D), optionally apply beta overlay, and backtest — then compare performance and equity curves.
1) Train/Test predictions (WF)
_fitpredict_one_horizon(5, kind, params) and _fitpredict_one_horizon(21, kind, params):2) Train-only horizon weights (cached)
_train_ic_for_horizon(...) computes mean daily Spearman IC on the training span only for each horizon.HORIZON_IC_SHRINK_LAMBDA.(H, model kind, params) to avoid recomputation.3) Blend horizons on test predictions
4) Optional beta overlay (test-time)
USE_BETA_NEUTRAL=True, residualize scores vs precomputed rolling betas (BETAS_FOR_SWEEP), then deduplicate to one (date, ticker) score.5) Backtest & metrics (test-only)
backtest(...) with the same execution settings (rebalance, top-q, costs, eligibility, per-name cap, leverage).CAGR, Vol, Sharpe, MaxDD, N_days, AvgTurnover, wall_sec.N_JOBS_VARIANTS variants run in parallel (threading backend); per-estimator threads limited via threadpool_limits to prevent oversubscription.variant_tbl: sortable comparison of metrics per variant (with params and runtime).Note: All weighting (IC for horizons) uses training data only. Backtests and reported metrics are strictly out-of-sample test results.
# --- 10. Model variant sweep -------------------
from functools import lru_cache
from joblib import Parallel, delayed
from threadpoolctl import threadpool_limits
import time, json, os
# How many variants to run concurrently (1 to keep serial + tqdm)
N_JOBS_VARIANTS = max(1, min(os.cpu_count() - 1, 4))
# Prevent thread oversubscription inside each worker
LIMIT_THREADS_PER_EST = 1
if USE_VARIANT_SWEEP:
VARIANTS = [
("ridge", {"alpha": 1.0}),
("elasticnet", {"alpha": 0.5, "l1_ratio": 0.2}),
("elasticnet", {"alpha": 1.0, "l1_ratio": 0.1}),
("hgbt", {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250}),
]
else:
VARIANTS = [(PROD_MODEL_KIND, PROD_MODEL_PARAMS)]
# precompute betas once for the sweep if overlay is ON
BETAS_FOR_SWEEP = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS) if USE_BETA_NEUTRAL else None
def _variant_label(kind: str, params: dict) -> str:
keys = ("alpha","l1_ratio","epsilon","max_depth","learning_rate","max_iter")
short = ",".join(f"{k}={params[k]}" for k in keys if k in params)
return f"{kind}({short})" if short else kind
# Memorize the train-IC by (H, model kind, params)
@lru_cache(maxsize=None)
def _train_ic_for_horizon_cached(H: int, kind: str, params_key: str) -> float:
params = json.loads(params_key)
if not USE_HORIZON_ENSEMBLE or '_train_ic_for_horizon' not in globals():
return 0.0
val = _train_ic_for_horizon(H, model_kind=kind, model_params=params)
return float(max(val, 0.0))
def run_variant(kind, params):
t0 = time.perf_counter()
label = _variant_label(kind, params)
params_key = json.dumps(params, sort_keys=True)
try:
with threadpool_limits(limits=LIMIT_THREADS_PER_EST):
# horizon-ensemble predictions with the given model
_, _, P5 = _fitpredict_one_horizon(5, kind, params)
P21 = None
if USE_HORIZON_ENSEMBLE:
_, _, P21 = _fitpredict_one_horizon(21, kind, params)
if USE_HORIZON_ENSEMBLE and P21 is not None:
# horizon weights on train (uses cached IC)
ic_train_by_H = {H: _train_ic_for_horizon_cached(H, kind, params_key) for H in HORIZON_LIST}
w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
if w_raw.sum()==0:
w_raw[:] = 1.0
w_raw /= w_raw.sum()
w_eq = np.ones_like(w_raw)/len(HORIZON_LIST)
wH = (1 - HORIZON_IC_SHRINK_LAMBDA)*w_raw + HORIZON_IC_SHRINK_LAMBDA*w_eq
H_WEIGHTS = dict(zip(HORIZON_LIST, wH))
# per-date z-score + blend across horizons
Z5 = _cs_zscore_by_date(P5).rename(columns={'y_pred':'z5'})
Z21 = _cs_zscore_by_date(P21).rename(columns={'y_pred':'z21'})
Z = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)
y_blend = H_WEIGHTS.get(5,0)*Z['z5'] + H_WEIGHTS.get(21,0)*Z['z21']
PRED = (pd.DataFrame({'date':Z['date'],'ticker':Z['ticker'],'y_pred':y_blend})
.set_index('date')[['ticker','y_pred']].sort_index())
else:
# 5D only path (PROFILE P1) — P5 is already neutralized vs 1D inside _fitpredict_one_horizon
PRED = P5.copy()
# de-dup safety
PRED = (PRED.reset_index().sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.set_index('date')[['ticker','y_pred']])
# beta overlay (reuses precomputed betas)
if USE_BETA_NEUTRAL and BETAS_FOR_SWEEP is not None:
PRED = neutralize_vs_beta(PRED, BETAS_FOR_SWEEP,
min_names=MIN_XS_NAMES,
strength=BETA_NEUTRAL_STRENGTH)
PRED = (PRED.reset_index().sort_values(['date','ticker'])
.drop_duplicates(['date','ticker'], keep='last')
.set_index('date')[['ticker','y_pred']])
# backtest on current risk toggles
bt, _ = backtest(
PRED, df[['ticker','close']],
long_short=LONG_SHORT,
top_q=TOP_QUANTILE,
freq=REBALANCE_FREQ,
holding_period=HOLDING_PERIOD,
tcost_bps=TCOST_BPS,
eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
gross_leverage=GROSS_LEVERAGE,
debug=False
)
stats = perf_stats(bt['ret'])
stats.update(
label=label,
kind=kind,
params=params_key,
wall_sec=time.perf_counter() - t0,
N_days=int(bt['ret'].shape[0]),
AvgTurnover=float(bt['turnover'].mean()),
)
return stats, bt['equity'].rename(label)
except Exception as e:
import traceback
tb = traceback.format_exc()
stats = dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0,
label=label, kind=kind, params=params_key, wall_sec=np.nan,
error=str(e))
print(f"[variant ERROR] {label}\n{tb}")
empty_eq = pd.Series(dtype=float, name=label)
return stats, empty_eq
# --- Execute ----------
rows, curves = [], {}
if N_JOBS_VARIANTS > 1 and len(VARIANTS) > 1:
print(f"[variants] running {len(VARIANTS)} models in parallel "
f"(n_jobs={N_JOBS_VARIANTS}, per-task threads={LIMIT_THREADS_PER_EST})")
tasks = (delayed(run_variant)(kind, params) for kind, params in VARIANTS)
with joblib_progress(len(VARIANTS), "Model variants"):
results = Parallel(n_jobs=N_JOBS_VARIANTS, backend="threading")(tasks)
for s, eq in results:
rows.append(s); curves[eq.name] = eq
else:
# Ridge-only (or single variant) path
for kind, params in VARIANTS:
s, eq = run_variant(kind, params)
rows.append(s); curves[eq.name] = eq
variant_tbl = (pd.DataFrame(rows)
.set_index('label')
.sort_values('Sharpe', ascending=False)
)
base_cols = ['kind','params','CAGR','Vol','Sharpe','MaxDD','N']
extra_cols = [c for c in ['N_days','AvgTurnover','wall_sec','error'] if c in variant_tbl.columns]
variant_tbl = variant_tbl.reindex(columns=base_cols + extra_cols)
display(variant_tbl.round(3))
title = "Equity by model kind (h-ens)" if len(curves) > 1 else f"Equity: {next(iter(curves))}"
multi_equity_plot(curves, title=title)
[variant ERROR] ridge
Traceback (most recent call last):
File "C:\Users\nadiy\AppData\Local\Temp\ipykernel_11216\1929571131.py", line 46, in run_variant
with threadpool_limits(limits=LIMIT_THREADS_PER_EST):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 171, in __init__
self._original_info = self._set_threadpool_limits()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 268, in _set_threadpool_limits
modules = _ThreadpoolInfo(prefixes=self._prefixes,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 340, in __init__
self._load_modules()
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 373, in _load_modules
self._find_modules_with_enum_process_module_ex()
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 485, in _find_modules_with_enum_process_module_ex
self._make_module_from_path(filepath)
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 515, in _make_module_from_path
module = module_class(filepath, prefix, user_api, internal_api)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 606, in __init__
self.version = self.get_version()
^^^^^^^^^^^^^^^^^^
File "C:\Users\nadiy\anaconda3\Lib\site-packages\threadpoolctl.py", line 646, in get_version
config = get_config().split()
^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'split'
| kind | params | CAGR | Vol | Sharpe | MaxDD | N | wall_sec | error | |
|---|---|---|---|---|---|---|---|---|---|
| label | |||||||||
| ridge | ridge | {} | NaN | NaN | NaN | NaN | 0 | NaN | 'NoneType' object has no attribute 'split' |
# 11.Holding Period (HP), Rebalance Freq, Top-Q
import itertools
HP_LIST = [5, 10]
FREQ_LIST = ['W-FRI', 'M']
TOPQ_LIST = [0.10, 0.20, 0.30]
TCOST_FOR_SWEEP = TCOST_BPS # costs fixed here; cost sweep below
def run_sweep_hpfreq_topq(pred_df, prices_df,
hp_list=HP_LIST, freq_list=FREQ_LIST, topq_list=TOPQ_LIST,
tcost_bps=TCOST_FOR_SWEEP, long_short=LONG_SHORT):
rows, eq = [], {}
grid = list(itertools.product(hp_list, freq_list, topq_list))
for hp, fq, tq in progress(grid, total=len(grid),
desc="Sweep HP/Freq/TopQ"):
bt, wheld = backtest(
pred_df, prices_df[['ticker','close']],
long_short=long_short,
top_q=tq,
freq=fq,
holding_period=hp,
tcost_bps=tcost_bps,
eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
gross_leverage=GROSS_LEVERAGE,
debug=False
)
stats = perf_stats(bt['ret'])
label = f"HP{hp}-{fq}-Q{int(tq*100)}"
stats.update(dict(
label=label, HP=hp, Freq=fq, TopQ=tq,
AvgTurnover=float(bt['turnover'].mean()),
AvgNames=float((wheld.abs() > 1e-12).sum(axis=1).mean())
))
eq[label] = bt['equity'].rename(label)
rows.append(stats)
res = pd.DataFrame(rows).set_index('label').sort_values('Sharpe', ascending=False)
eq_df = pd.concat(eq, axis=1).dropna(how='all')
return res, eq_df
sweep_tbl, sweep_eq = run_sweep_hpfreq_topq(PRED_USE, df)
display(sweep_tbl.round(3))
# Sharpe vs Turnover scatter
import plotly.graph_objects as go
def scatter_sharpe_vs_turnover(sweep_df, title="Sharpe vs Avg Daily Turnover"):
s = sweep_df.copy()
s['TopQ%'] = (s['TopQ']*100).astype(int)
fig = go.Figure()
for fq in s['Freq'].unique():
ss = s[s['Freq']==fq]
fig.add_trace(go.Scatter(
x=ss['AvgTurnover'], y=ss['Sharpe'],
mode='markers+text', name=fq,
text=ss.index, textposition='top center',
marker=dict(size=10),
hovertemplate=("Label=%{text}<br>Sharpe=%{y:.2f}"
"<br>Turnover=%{x:.3f}<br>HP=%{customdata[0]} "
"<br>TopQ=%{customdata[1]}%"),
customdata=np.c_[ss['HP'], ss['TopQ%']]
))
fig.update_layout(title=title, xaxis_title="Avg daily turnover (|Δw|1)",
yaxis_title="Sharpe", width=900, height=380)
fig.show()
scatter_sharpe_vs_turnover(sweep_tbl)
# Overlay equity curves for the top 3 combos by Sharpe
top_labels = sweep_tbl.head(3).index.tolist()
multi_equity_plot(
{k: sweep_eq[k] for k in top_labels},
title=f"Equity: Top 3 sweep configs {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}"
)
Sweep HP/Freq/TopQ: 0%| | 0/12 [00:00<…
| CAGR | Vol | Sharpe | MaxDD | N | HP | Freq | TopQ | AvgTurnover | AvgNames | |
|---|---|---|---|---|---|---|---|---|---|---|
| label | ||||||||||
| HP10-M-Q20 | 0.1340 | 0.0320 | 3.9920 | -0.0250 | 832 | 10 | M | 0.2000 | 0.0210 | 23.8040 |
| HP10-M-Q30 | 0.1650 | 0.0390 | 3.9460 | -0.0380 | 832 | 10 | M | 0.3000 | 0.0290 | 34.2160 |
| HP5-M-Q20 | 0.1340 | 0.0330 | 3.8460 | -0.0240 | 832 | 5 | M | 0.2000 | 0.0210 | 21.5360 |
| HP5-M-Q30 | 0.1580 | 0.0400 | 3.6800 | -0.0390 | 832 | 5 | M | 0.3000 | 0.0290 | 31.7600 |
| HP10-W-FRI-Q20 | 0.1150 | 0.0330 | 3.3470 | -0.0170 | 849 | 10 | W-FRI | 0.2000 | 0.0460 | 37.5680 |
| HP10-W-FRI-Q30 | 0.1340 | 0.0390 | 3.2820 | -0.0250 | 849 | 10 | W-FRI | 0.3000 | 0.0580 | 49.4710 |
| HP10-M-Q10 | 0.0720 | 0.0230 | 3.0260 | -0.0180 | 832 | 10 | M | 0.1000 | 0.0130 | 12.7810 |
| HP5-M-Q10 | 0.0720 | 0.0240 | 2.8760 | -0.0160 | 832 | 5 | M | 0.1000 | 0.0130 | 11.3700 |
| HP10-W-FRI-Q10 | 0.0650 | 0.0220 | 2.8550 | -0.0120 | 849 | 10 | W-FRI | 0.1000 | 0.0270 | 20.9630 |
| HP5-W-FRI-Q20 | 0.0970 | 0.0340 | 2.7720 | -0.0180 | 849 | 5 | W-FRI | 0.2000 | 0.0780 | 31.2890 |
| HP5-W-FRI-Q30 | 0.1070 | 0.0400 | 2.5480 | -0.0270 | 849 | 5 | W-FRI | 0.3000 | 0.1010 | 43.7360 |
| HP5-W-FRI-Q10 | 0.0560 | 0.0230 | 2.3650 | -0.0130 | 849 | 5 | W-FRI | 0.1000 | 0.0460 | 16.5370 |
# 11.1 Cost sweep (keep HP/Freq/TopQ at chosen baseline from §8)
COSTS = [0, 5, 10, 25]
def cost_sweep(pred_df, prices_df,
hp=HOLDING_PERIOD, fq=REBALANCE_FREQ, tq=TOP_QUANTILE, long_short=LONG_SHORT):
rows, eq = [], {}
for bps in progress(COSTS, total=len(COSTS), desc="Cost sweep"):
bt, _ = backtest(
pred_df, prices_df[['ticker','close']],
long_short=long_short,
top_q=tq,
freq=fq,
holding_period=hp,
tcost_bps=bps,
eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
gross_leverage=GROSS_LEVERAGE,
debug=False
)
s = perf_stats(bt['ret']); s.update(dict(tcost_bps=bps))
rows.append(s)
eq[f"{bps}bps"] = bt['equity'].rename(f"{bps}bps")
return pd.DataFrame(rows).set_index('tcost_bps').sort_index(), eq
cost_tbl, cost_eq = cost_sweep(PRED_USE, df,
hp=HOLDING_PERIOD,
fq=REBALANCE_FREQ,
tq=TOP_QUANTILE,
long_short=LONG_SHORT)
display(cost_tbl.round(3))
# Plot cost equity overlays
multi_equity_plot(cost_eq, title=f"Equity by transaction costs (bps) {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}")
import plotly.graph_objects as go
fig = go.Figure([go.Bar(x=[str(i) for i in cost_tbl.index], y=cost_tbl['Sharpe'].values)])
mode = "(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
fig.update_layout(title=f"Sharpe by tcost (bps) {mode}",
xaxis_title="bps", yaxis_title="Sharpe",
width=650, height=320)
fig.show()
Cost sweep: 0%| | 0/4 [00:00<…
| CAGR | Vol | Sharpe | MaxDD | N | |
|---|---|---|---|---|---|
| tcost_bps | |||||
| 0 | 0.1410 | 0.0340 | 3.9470 | -0.0150 | 849 |
| 5 | 0.1300 | 0.0340 | 3.6540 | -0.0160 | 849 |
| 10 | 0.1190 | 0.0340 | 3.3600 | -0.0170 | 849 |
| 25 | 0.0870 | 0.0340 | 2.4790 | -0.0200 | 849 |
from datetime import datetime
# --- Checks ---
assert 'PRED_USE' in locals(), "PRED_USE (neutralized ensemble) not defined yet."
assert 'bt_5d' in locals(), "bt_5d not defined yet."
assert 'w_held_5d' in locals(), "w_held_5d not defined yet."
RUNS_DIR = Path("results/equity_daily")
RUNS_DIR.mkdir(parents=True, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = RUNS_DIR / run_id
out_dir.mkdir(parents=True, exist_ok=True)
# --- Save core artifacts ---
PRED_USE.to_parquet(out_dir / "pred_5d.parquet")
bt_5d.to_parquet(out_dir / "bt_5d.parquet")
w_held_5d.to_parquet(out_dir / "weights_held.parquet")
# --- Save config / metadata for full reproducibility ---
metadata = {
"run_id": run_id,
"start_date": str(START_DATE),
"end_date": str(END_DATE),
"target_horizons": TARGET_HORIZONS,
"max_h": MAX_H,
"train_years": TRAIN_YEARS,
"step_months": STEP_MONTHS,
"embargo_days": EMBARGO_DAYS,
"rebalance_freq": REBALANCE_FREQ,
"long_short": LONG_SHORT,
"top_quantile": TOP_QUANTILE,
"holding_period": HOLDING_PERIOD,
"tcost_bps": TCOST_BPS,
"random_state": RANDOM_STATE,
"n_tickers": int(df['ticker'].nunique()),
"n_rows_df": int(len(df)),
"n_features": int(len(feature_cols)),
}
with open(out_dir / "config.json", "w") as f:
json.dump(metadata, f, indent=2)
# --- Save feature list, tickers, and windows used ---
pd.Series(feature_cols, name="feature").to_csv(out_dir / "features.csv", index=False)
pd.Series(sorted(df['ticker'].unique()), name="ticker").to_csv(out_dir / "universe.csv", index=False)
# Store window boundaries for audit
wf_tbl = pd.DataFrame(
[(w.train_start, w.train_end, w.test_start, w.test_end) for w in wf_windows_run],
columns=["train_start","train_end","test_start","test_end"]
)
wf_tbl.to_csv(out_dir / "wf_windows.csv", index=False)
# --- Performance & IC summary ---
perf = perf_stats(bt_5d['ret'])
summary = {
"CAGR": float(perf["CAGR"]) if perf["CAGR"]==perf["CAGR"] else np.nan,
"Vol": float(perf["Vol"]) if perf["Vol"]==perf["Vol"] else np.nan,
"Sharpe":float(perf["Sharpe"]) if perf["Sharpe"]==perf["Sharpe"] else np.nan,
"MaxDD": float(perf["MaxDD"]) if perf["MaxDD"]==perf["MaxDD"] else np.nan,
"N_days":int(perf["N"]),
}
# IC
if "ic_true" in locals() and isinstance(ic_true, pd.Series) and len(ic_true):
summary.update({
"IC_mean": float(ic_true.mean()),
"IC_std": float(ic_true.std()),
"IC_t": float(ic_true.mean()/ic_true.std()*np.sqrt(len(ic_true))) if ic_true.std() > 0 else np.nan,
"IC_N": int(len(ic_true)),
})
pd.DataFrame([summary]).to_csv(out_dir / "summary.csv", index=False)
# --- Save an equity curve PNG ---
plt.figure(figsize=(10,4))
title_str = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
bt_5d['equity'].plot(title=title_str)
plt.tight_layout()
plt.savefig(out_dir / "equity_curve.png", dpi=150)
plt.close()
# --- update a 'latest' pointer ---
latest_link = RUNS_DIR / "latest"
try:
if latest_link.exists() or latest_link.is_symlink():
latest_link.unlink()
latest_link.symlink_to(out_dir.resolve())
except Exception:
with open(RUNS_DIR / "LATEST_RUN.txt", "w") as f:
f.write(str(out_dir.resolve()))
print(f"Artifacts saved → {out_dir}")
Artifacts saved → results\equity_daily\20250902-193628
def load_run(run_path: Path):
preds = pd.read_parquet(run_path / "pred_5d.parquet")
bt = pd.read_parquet(run_path / "bt_5d.parquet")
weights = pd.read_parquet(run_path / "weights_held.parquet")
with open(run_path / "config.json") as f:
cfg = json.load(f)
return preds, bt, weights, cfg
# Example:
# preds, bt, weights, cfg = load_run(Path("results/equity_daily/latest"))